← Back to team overview

bigdata-dev team mailing list archive

Re: [Merge] lp:~admcleod/charms/trusty/apache-spark/tuning-testing into lp:~bigdata-dev/charms/trusty/apache-spark/trunk

 


Diff comments:

> === modified file 'config.yaml'
> --- config.yaml	2015-06-05 04:09:25 +0000
> +++ config.yaml	2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
>          default: ''
>          description: |
>              URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
> +<<<<<<< TREE
>      spark_execution_mode:
>          type: string
>          default: 'yarn-client'
>          description: |
>              Options are empty string "" for local (no YARN), "yarn-client, and "yarn-master". There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
>      
> +=======
> +    spark_executor_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +    spark_driver_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
> 
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py	2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py	2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc 
>  import jujuresources
>  from charmhelpers.core import hookenv
>  from charmhelpers.core import host
> @@ -62,7 +65,9 @@
>          utils.re_edit_in_place(spark_log4j, {
>                  r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',
>                  })
> -
> +        with open(spark_default, 'a') as file:
> +            file.write("spark.executor.memory\t256m\n")
> +        
>      def configure_spark(self):
>          '''
>          Configure spark environment for all users
> @@ -86,6 +91,23 @@
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)
>  
> +    def spark_reconfigure(self):
> +
> +        sdm_automemory = 50 # percent of free memory to allocate to spark driver memory
> +        sem_automemory = 15 # percent of free memory to allocate to spark executor memory
> +        sdm_usermemory = hookenv.config()['spark_driver_memory']
> +        sem_usermemory = hookenv.config()['spark_executor_memory']
> +        jujuresources.juju_log('Allocating memory for spark driver...', 'INFO')
> +        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, sdm_usermemory)
> +        jujuresources.juju_log('Allocating memory for spark executor...', 'INFO')
> +        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, sem_usermemory)
> +        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> +        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +        	r'.*spark.driver.memory *.*':sdm,
> +        	r'.*spark.executor.memory *.*':sem,
> +		})
> +
>      def spark_optimize(self):
>          import subprocess
>          from glob import glob
> @@ -101,12 +123,12 @@
>              env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
>              env['SPARK_HOME']=spark_home
>              env['IPYTHON']="1"
> -        spark_conf = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
> -        utils.re_edit_in_place(spark_conf, {
> -            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    true',
> -            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> -            })
> -
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
> +            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
> +            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
> +	    })
> +        self.spark_reconfigure()
> +            
>      def start(self):
>          with utils.environment_edit_in_place('/etc/environment') as env:
>              env['MASTER']= hookenv.config()['spark_execution_mode']
> 
> === modified file 'hooks/config-changed'
> --- hooks/config-changed	2015-04-22 15:27:27 +0000
> +++ hooks/config-changed	2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>  
>  import common
>  common.manage()
> +import callbacks
> +import jujuresources 
> +
> +from charmhelpers.contrib import bigdata
> +
> +try: 
> +	spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> +	dist_config = bigdata.utils.DistConfig(filename='dist.yaml', required_keys=spark_reqs)
> +	spark = callbacks.Spark(dist_config)
> +	callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:
> +	logstring = "Problem with setting memory, check spark-defaults.conf exists: " + str(e)
> +	jujuresources.juju_log(logstring, 'ERROR')
> +	pass
> 
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py	1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py	2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +#       % - % of TOTAL system memory to allocate
> +#       f - % of FREE (free + cache + buffers) to allocate (aka auto allocation)
> +#       m - memory in mb to allocate
> +#       g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user memory,
> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> +        def __init__(self, unit):
> +           with open('/proc/meminfo', 'r') as mem:
> +                        lines = mem.readlines()
> +
> +           self._total = int(lines[0].split()[1])
> +           self._free = int(lines[1].split()[1])
> +           self._buff = int(lines[3].split()[1])
> +           self._cached = int(lines[4].split()[1])
> +
> +           self.unit = unit
> +
> +        @property
> +        def total(self):

I think as long as the indentation is consistent at a given level, python itself won't complain.  But I believe the lint checks encourage only using spaces.

> +           return self._total / 1024
> +
> +        @property
> +        def user_free(self):
> +           return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> +        def memcalc(self, autosize, usersize, alloctype):
> +                self.alloctype = alloctype.lower()
> +                self.usersize = int(usersize)
> +                self.autosize = int(autosize)
> +
> +                if self.alloctype == "f":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory Auto-configuration: " + str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.usersize)
> +                elif self.alloctype == "%":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.total / 100.00) * self.usersize)
> +                elif self.alloctype.lower() == "m":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> +                        self.memamount = self.usersize
> +                elif self.alloctype.lower() == "g":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 1024)
> +                        # thats right, no si units here...
> +                        self.memamount = self.usersize * 1000
> +
> +                if self.memamount > self.cursysmem.user_free:
> +                        jujuresources.juju_log("Configuration overcommits free memory, switching to auto-configuration - check configuration values", 'ERROR')
> +                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.autosize)
> +                        logstring = "Memory configuration: " + str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system memory (" + str(self.cursysmem.user_free) + ")"
> +
> +                jujuresources.juju_log(logstring, 'INFO')
> +
> +        def reqappmem(self, autosize, usersize):
> +                self.usersize = usersize
> +                self.autosize = autosize
> +                memtype = self.usersize[-1:]
> +                size = self.usersize[:-1]
> +
> +                if memtype != "f" and memtype != "%" and memtype.lower() != "m" and memtype.lower() != "g":
> +                        jujuresources.juju_log('Invalid memory configuration type defined, enabling memory auto-configuration...', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                if not size or int(size) <1 :
> +                        jujuresources.juju_log('Invalid memory configuration size (too small or nul), enabling memory auto-configuration', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                self.memcalc(int(self.autosize), int(size), memtype)
> +
> +                return self.memamount
> +
> 
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn	2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn	2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>  
>  import unittest
>  import amulet
> -
> +import random
>  
>  class TestDeploy(unittest.TestCase):
>      """
> @@ -12,11 +12,17 @@
>  
>      @classmethod
>      def setUpClass(cls):
> +        random.seed()
> +        global sdmrnd, semrnd, sdmrndstr, sdmrndstr
> +        sdmrnd = random.randint(10,100)

The only concern with random values is in having tests that fail intermittently depending on what values the RNG spits out.  To test the config value changing, you can just pick two different pre-chosen values and switch between them.  You just have to make sure you cover any edge or corner cases, but you'd want to be thinking about that with your random range, anyway.

Again, this is just a style thing, but I have run in to hard-to-debug test failures before due to RNGs so I really try to avoid them and recommend others do, as well.  :)

> +        semrnd = random.randint(10,100)
> +        sdmrndstr = str(sdmrnd) + 'f'
> +        semrndstr = str(semrnd) + 'f'
>          cls.d = amulet.Deployment(series='trusty')
>          # Deploy a hadoop cluster
>          cls.d.add('yarn-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
>          cls.d.add('hdfs-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> -        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> +        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
>          cls.d.add('hadoop-plugin', charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
>          cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
>          cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>  
>          # Add Spark Service
>          cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> -        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> -        cls.d.setup(timeout=3600)
> +        #cls.d.add('spark', charm='/root/canonical/trusty/apache-spark-merge')
> +        sparkmemconf = {'spark_driver_memory': sdmrndstr, 'spark_executor_memory': semrndstr}
> +        cls.d.configure('spark', sparkmemconf)
> +        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +        
> +        cls.d.setup(timeout=9000)
>          cls.d.sentry.wait()
>          cls.unit = cls.d.sentry.unit['spark/0']
>  
> +    def test_reconfigsdm(self):
> +        sdmteststr = 'grep -A2 "Allocating memory for spark driver" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\'' 
> +        output, retcode = self.unit.run(sdmteststr)
> +
> +        if str(output) != str(sdmrnd):
> +                message = "Spark driver memory config test: Expected %s, got %s" % (str(sdmrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)

Yeah, I think raise_status() was from before we were encouraging the use of unittest.TestCase, and was somewhat useful then (though a regular assert would have worked then as well).

> +
> +    def test_configsem(self):
> +        semteststr = 'grep -A2 "Allocating memory for spark executor" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\''
> +        output, retcode = self.unit.run(semteststr)
> +
> +        if str(output) != str(semrnd):
> +                message = "Spark executor memory config test: Expected %s, got %s" % (str(semrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)
> +
>      def test_deploy(self):
>          output, retcode = self.unit.run("pgrep -a java")
>          assert 'HistoryServer' in output, "Spark HistoryServer is not started"
> 


-- 
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch lp:~bigdata-dev/charms/trusty/apache-spark/trunk.


References