← Back to team overview

bigdata-dev team mailing list archive

Re: [Merge] lp:~admcleod/charms/trusty/apache-spark/tuning-testing into lp:~bigdata-dev/charms/trusty/apache-spark/trunk

 

Thanks for the feedback, will take it all into consideration.

Diff comments:

> === modified file 'config.yaml'
> --- config.yaml	2015-06-05 04:09:25 +0000
> +++ config.yaml	2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
>          default: ''
>          description: |
>              URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
> +<<<<<<< TREE
>      spark_execution_mode:
>          type: string
>          default: 'yarn-client'
>          description: |
>              Options are empty string "" for local (no YARN), "yarn-client, and "yarn-master". There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
>      
> +=======
> +    spark_executor_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +    spark_driver_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
> 
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py	2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py	2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc 
>  import jujuresources
>  from charmhelpers.core import hookenv
>  from charmhelpers.core import host
> @@ -62,7 +65,9 @@
>          utils.re_edit_in_place(spark_log4j, {
>                  r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',
>                  })
> -
> +        with open(spark_default, 'a') as file:
> +            file.write("spark.executor.memory\t256m\n")
> +        
>      def configure_spark(self):
>          '''
>          Configure spark environment for all users
> @@ -86,6 +91,23 @@
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)
>  
> +    def spark_reconfigure(self):
> +
> +        sdm_automemory = 50 # percent of free memory to allocate to spark driver memory
> +        sem_automemory = 15 # percent of free memory to allocate to spark executor memory
> +        sdm_usermemory = hookenv.config()['spark_driver_memory']
> +        sem_usermemory = hookenv.config()['spark_executor_memory']
> +        jujuresources.juju_log('Allocating memory for spark driver...', 'INFO')
> +        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, sdm_usermemory)
> +        jujuresources.juju_log('Allocating memory for spark executor...', 'INFO')
> +        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, sem_usermemory)
> +        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> +        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +        	r'.*spark.driver.memory *.*':sdm,
> +        	r'.*spark.executor.memory *.*':sem,
> +		})
> +
>      def spark_optimize(self):
>          import subprocess
>          from glob import glob
> @@ -101,12 +123,12 @@
>              env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
>              env['SPARK_HOME']=spark_home
>              env['IPYTHON']="1"
> -        spark_conf = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
> -        utils.re_edit_in_place(spark_conf, {
> -            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    true',
> -            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> -            })
> -
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
> +            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> +	    })
> +        self.spark_reconfigure()
> +            
>      def start(self):
>          with utils.environment_edit_in_place('/etc/environment') as env:
>              env['MASTER']= hookenv.config()['spark_execution_mode']
> 
> === modified file 'hooks/config-changed'
> --- hooks/config-changed	2015-04-22 15:27:27 +0000
> +++ hooks/config-changed	2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>  
>  import common
>  common.manage()
> +import callbacks
> +import jujuresources 
> +
> +from charmhelpers.contrib import bigdata
> +
> +try: 
> +	spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> +	dist_config = bigdata.utils.DistConfig(filename='dist.yaml', required_keys=spark_reqs)
> +	spark = callbacks.Spark(dist_config)
> +	callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:
> +	logstring = "Problem with setting memory, check spark-defaults.conf exists: " + str(e)
> +	jujuresources.juju_log(logstring, 'ERROR')
> +	pass
> 
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py	1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py	2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +#       % - % of TOTAL system memory to allocate
> +#       f - % of FREE (free + cache + buffers) to allocate (aka auto allocation)
> +#       m - memory in mb to allocate
> +#       g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user memory,
> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> +        def __init__(self, unit):
> +           with open('/proc/meminfo', 'r') as mem:
> +                        lines = mem.readlines()
> +
> +           self._total = int(lines[0].split()[1])
> +           self._free = int(lines[1].split()[1])
> +           self._buff = int(lines[3].split()[1])
> +           self._cached = int(lines[4].split()[1])
> +
> +           self.unit = unit
> +
> +        @property
> +        def total(self):

Strangely enough, it complained about every other space vs tab indent...

> +           return self._total / 1024
> +
> +        @property
> +        def user_free(self):
> +           return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> +        def memcalc(self, autosize, usersize, alloctype):
> +                self.alloctype = alloctype.lower()
> +                self.usersize = int(usersize)
> +                self.autosize = int(autosize)
> +
> +                if self.alloctype == "f":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory Auto-configuration: " + str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.usersize)
> +                elif self.alloctype == "%":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.total / 100.00) * self.usersize)
> +                elif self.alloctype.lower() == "m":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> +                        self.memamount = self.usersize
> +                elif self.alloctype.lower() == "g":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 1024)
> +                        # thats right, no si units here...
> +                        self.memamount = self.usersize * 1000

I did have a reason... but I've forgotten now :)

> +
> +                if self.memamount > self.cursysmem.user_free:
> +                        jujuresources.juju_log("Configuration overcommits free memory, switching to auto-configuration - check configuration values", 'ERROR')
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.autosize)
> +                        logstring = "Memory configuration: " + str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system memory (" + str(self.cursysmem.user_free) + ")"
> +
> +                jujuresources.juju_log(logstring, 'INFO')
> +
> +        def reqappmem(self, autosize, usersize):
> +                self.usersize = usersize
> +                self.autosize = autosize
> +                memtype = self.usersize[-1:]
> +                size = self.usersize[:-1]
> +
> +                if memtype != "f" and memtype != "%" and memtype.lower() != "m" and memtype.lower() != "g":
> +                        jujuresources.juju_log('Invalid memory configuration type defined, enabling memory auto-configuration...', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                if not size or int(size) <1 :
> +                        jujuresources.juju_log('Invalid memory configuration size (too small or nul), enabling memory auto-configuration', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                self.memcalc(int(self.autosize), int(size), memtype)
> +
> +                return self.memamount
> +
> 
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn	2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn	2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>  
>  import unittest
>  import amulet
> -
> +import random
>  
>  class TestDeploy(unittest.TestCase):
>      """
> @@ -12,11 +12,17 @@
>  
>      @classmethod
>      def setUpClass(cls):
> +        random.seed()
> +        global sdmrnd, semrnd, sdmrndstr, sdmrndstr
> +        sdmrnd = random.randint(10,100)

Well the idea was that if the value was static it wouldn't be a true test of the configuration values changing - but I see your point.

> +        semrnd = random.randint(10,100)
> +        sdmrndstr = str(sdmrnd) + 'f'
> +        semrndstr = str(semrnd) + 'f'
>          cls.d = amulet.Deployment(series='trusty')
>          # Deploy a hadoop cluster
>          cls.d.add('yarn-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
>          cls.d.add('hdfs-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> -        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> +        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
>          cls.d.add('hadoop-plugin', charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
>          cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
>          cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>  
>          # Add Spark Service
>          cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> -        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> -        cls.d.setup(timeout=3600)
> +        #cls.d.add('spark', charm='/root/canonical/trusty/apache-spark-merge')
> +        sparkmemconf = {'spark_driver_memory': sdmrndstr, 'spark_executor_memory': semrndstr}
> +        cls.d.configure('spark', sparkmemconf)
> +        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +        
> +        cls.d.setup(timeout=9000)
>          cls.d.sentry.wait()
>          cls.unit = cls.d.sentry.unit['spark/0']
>  
> +    def test_reconfigsdm(self):
> +        sdmteststr = 'grep -A2 "Allocating memory for spark driver" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\'' 
> +        output, retcode = self.unit.run(sdmteststr)
> +
> +        if str(output) != str(sdmrnd):
> +                message = "Spark driver memory config test: Expected %s, got %s" % (str(sdmrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)

I did discover that during testing but again wasn't sure about best practice

> +
> +    def test_configsem(self):
> +        semteststr = 'grep -A2 "Allocating memory for spark executor" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\''
> +        output, retcode = self.unit.run(semteststr)
> +
> +        if str(output) != str(semrnd):
> +                message = "Spark executor memory config test: Expected %s, got %s" % (str(semrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)
> +
>      def test_deploy(self):
>          output, retcode = self.unit.run("pgrep -a java")
>          assert 'HistoryServer' in output, "Spark HistoryServer is not started"
> 


-- 
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch lp:~bigdata-dev/charms/trusty/apache-spark/trunk.


References