← Back to team overview

bigdata-dev team mailing list archive

Re: [Merge] lp:~admcleod/charms/trusty/apache-spark/tuning-testing into lp:~bigdata-dev/charms/trusty/apache-spark/trunk

 

Andrew,

Thanks for providing us this submission.  Overall, it's quite good.  There are a few issues mentioned below as diff comments but nothing that's too big of a deal; mostly they're just code style and readability comments.

Diff comments:

> === modified file 'config.yaml'
> --- config.yaml	2015-06-05 04:09:25 +0000
> +++ config.yaml	2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
>          default: ''
>          description: |
>              URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
> +<<<<<<< TREE

Merge artifacts should be cleaned up.

>      spark_execution_mode:
>          type: string
>          default: 'yarn-client'
>          description: |
>              Options are empty string "" for local (no YARN), "yarn-client, and "yarn-master". There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
>      
> +=======
> +    spark_executor_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +    spark_driver_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
> 
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py	2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py	2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc 
>  import jujuresources
>  from charmhelpers.core import hookenv
>  from charmhelpers.core import host
> @@ -62,7 +65,9 @@
>          utils.re_edit_in_place(spark_log4j, {
>                  r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',
>                  })
> -
> +        with open(spark_default, 'a') as file:
> +            file.write("spark.executor.memory\t256m\n")
> +        
>      def configure_spark(self):
>          '''
>          Configure spark environment for all users
> @@ -86,6 +91,23 @@
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)
>  
> +    def spark_reconfigure(self):
> +
> +        sdm_automemory = 50 # percent of free memory to allocate to spark driver memory
> +        sem_automemory = 15 # percent of free memory to allocate to spark executor memory
> +        sdm_usermemory = hookenv.config()['spark_driver_memory']
> +        sem_usermemory = hookenv.config()['spark_executor_memory']
> +        jujuresources.juju_log('Allocating memory for spark driver...', 'INFO')
> +        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, sdm_usermemory)
> +        jujuresources.juju_log('Allocating memory for spark executor...', 'INFO')
> +        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, sem_usermemory)
> +        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> +        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +        	r'.*spark.driver.memory *.*':sdm,
> +        	r'.*spark.executor.memory *.*':sem,
> +		})
> +
>      def spark_optimize(self):
>          import subprocess
>          from glob import glob
> @@ -101,12 +123,12 @@
>              env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
>              env['SPARK_HOME']=spark_home
>              env['IPYTHON']="1"
> -        spark_conf = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
> -        utils.re_edit_in_place(spark_conf, {
> -            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    true',
> -            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> -            })
> -
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
> +            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> +	    })
> +        self.spark_reconfigure()
> +            
>      def start(self):
>          with utils.environment_edit_in_place('/etc/environment') as env:
>              env['MASTER']= hookenv.config()['spark_execution_mode']
> 
> === modified file 'hooks/config-changed'
> --- hooks/config-changed	2015-04-22 15:27:27 +0000
> +++ hooks/config-changed	2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>  
>  import common
>  common.manage()
> +import callbacks
> +import jujuresources 
> +
> +from charmhelpers.contrib import bigdata
> +
> +try: 
> +	spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> +	dist_config = bigdata.utils.DistConfig(filename='dist.yaml', required_keys=spark_reqs)
> +	spark = callbacks.Spark(dist_config)
> +	callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:

Blanket 'except Exception / pass' blocks should almost always be avoided.

In this case, this is guaranteed to fail (fall into the Exception handler block) on first deployment because it is not integrated into the framework that enforces the preconditions before the dependencies are installed.  However, this is an issue with our framework and the non-obvious, and non-documented, initial learning curve.  We need to add comments to the charm to advise how to make additions like these, which in this case is to just add the 'callbacks.Spark.spark_reconfigure' line to the list of 'callbacks' in common.py:manage.

Also, class invocation convention is to use 'spark.spark_reconfigure()' instead of 'Spark.spark_reconfigure(spark)', which matches how the other methods are invoked in the 'callbacks' list (although the latter will work in the general case).

> +	logstring = "Problem with setting memory, check spark-defaults.conf exists: " + str(e)
> +	jujuresources.juju_log(logstring, 'ERROR')
> +	pass
> 
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py	1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py	2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +#       % - % of TOTAL system memory to allocate
> +#       f - % of FREE (free + cache + buffers) to allocate (aka auto allocation)
> +#       m - memory in mb to allocate
> +#       g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user memory,

This caveat should be documented for the user in the config.yaml description.

> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> +        def __init__(self, unit):
> +           with open('/proc/meminfo', 'r') as mem:
> +                        lines = mem.readlines()
> +
> +           self._total = int(lines[0].split()[1])
> +           self._free = int(lines[1].split()[1])
> +           self._buff = int(lines[3].split()[1])
> +           self._cached = int(lines[4].split()[1])
> +
> +           self.unit = unit
> +
> +        @property
> +        def total(self):

Inconsistent indentation here.  I also saw tabs in use in a couple of places.

Our charms are missing a `make lint` option to catch things like this, but we generally try to follow PEP8 (with a general exception of lines up to around 120, though we try to avoid it).

> +           return self._total / 1024
> +
> +        @property
> +        def user_free(self):
> +           return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> +        def memcalc(self, autosize, usersize, alloctype):
> +                self.alloctype = alloctype.lower()
> +                self.usersize = int(usersize)
> +                self.autosize = int(autosize)
> +
> +                if self.alloctype == "f":
> +                        self.cursysmem = getsysmem('MB')

Fair bit of repetition here.  This block could use some refactoring cleanup to make it easier to follow.

> +                        logstring = "Memory Auto-configuration: " + str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.usersize)
> +                elif self.alloctype == "%":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.total / 100.00) * self.usersize)
> +                elif self.alloctype.lower() == "m":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> +                        self.memamount = self.usersize
> +                elif self.alloctype.lower() == "g":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 1024)
> +                        # thats right, no si units here...
> +                        self.memamount = self.usersize * 1000

I'm unclear why this is using non-SI conversion (1000), as noted in the comment, while the reporting in the logstring and all the other conversions are using SI (1024)?  What's special about the GB case?

> +
> +                if self.memamount > self.cursysmem.user_free:
> +                        jujuresources.juju_log("Configuration overcommits free memory, switching to auto-configuration - check configuration values", 'ERROR')
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.autosize)
> +                        logstring = "Memory configuration: " + str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system memory (" + str(self.cursysmem.user_free) + ")"
> +
> +                jujuresources.juju_log(logstring, 'INFO')
> +
> +        def reqappmem(self, autosize, usersize):
> +                self.usersize = usersize
> +                self.autosize = autosize
> +                memtype = self.usersize[-1:]
> +                size = self.usersize[:-1]
> +
> +                if memtype != "f" and memtype != "%" and memtype.lower() != "m" and memtype.lower() != "g":
> +                        jujuresources.juju_log('Invalid memory configuration type defined, enabling memory auto-configuration...', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                if not size or int(size) <1 :
> +                        jujuresources.juju_log('Invalid memory configuration size (too small or nul), enabling memory auto-configuration', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                self.memcalc(int(self.autosize), int(size), memtype)
> +
> +                return self.memamount
> +
> 
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn	2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn	2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>  
>  import unittest
>  import amulet
> -
> +import random
>  
>  class TestDeploy(unittest.TestCase):
>      """
> @@ -12,11 +12,17 @@
>  
>      @classmethod
>      def setUpClass(cls):
> +        random.seed()
> +        global sdmrnd, semrnd, sdmrndstr, sdmrndstr

Globals are unnecessary here, as you can just set them as cls attributes (`cls.sdmrnd = ...`)

> +        sdmrnd = random.randint(10,100)

Random values in tests are generally a bad idea.

> +        semrnd = random.randint(10,100)
> +        sdmrndstr = str(sdmrnd) + 'f'
> +        semrndstr = str(semrnd) + 'f'
>          cls.d = amulet.Deployment(series='trusty')
>          # Deploy a hadoop cluster
>          cls.d.add('yarn-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
>          cls.d.add('hdfs-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> -        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> +        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
>          cls.d.add('hadoop-plugin', charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
>          cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
>          cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>  
>          # Add Spark Service
>          cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> -        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> -        cls.d.setup(timeout=3600)
> +        #cls.d.add('spark', charm='/root/canonical/trusty/apache-spark-merge')
> +        sparkmemconf = {'spark_driver_memory': sdmrndstr, 'spark_executor_memory': semrndstr}
> +        cls.d.configure('spark', sparkmemconf)
> +        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +        
> +        cls.d.setup(timeout=9000)
>          cls.d.sentry.wait()
>          cls.unit = cls.d.sentry.unit['spark/0']
>  
> +    def test_reconfigsdm(self):
> +        sdmteststr = 'grep -A2 "Allocating memory for spark driver" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\'' 
> +        output, retcode = self.unit.run(sdmteststr)
> +
> +        if str(output) != str(sdmrnd):
> +                message = "Spark driver memory config test: Expected %s, got %s" % (str(sdmrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)

amulet.raise_status() does a hard sys.exit(1), so will prevent the other tests from running.  This block should be changed to self.assertEqual(output, sdmrnd)

> +
> +    def test_configsem(self):
> +        semteststr = 'grep -A2 "Allocating memory for spark executor" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\''
> +        output, retcode = self.unit.run(semteststr)
> +
> +        if str(output) != str(semrnd):

Same here.  self.assertEqual(output, semrnd)

> +                message = "Spark executor memory config test: Expected %s, got %s" % (str(semrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)
> +
>      def test_deploy(self):
>          output, retcode = self.unit.run("pgrep -a java")
>          assert 'HistoryServer' in output, "Spark HistoryServer is not started"
> 


-- 
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch lp:~bigdata-dev/charms/trusty/apache-spark/trunk.


References