← Back to team overview

bigdata-dev team mailing list archive

[Merge] lp:~admcleod/charms/trusty/apache-spark/tuning-testing into lp:~bigdata-dev/charms/trusty/apache-spark/trunk


Andrew McLeod has proposed merging lp:~admcleod/charms/trusty/apache-spark/tuning-testing into lp:~bigdata-dev/charms/trusty/apache-spark/trunk.

Requested reviews:
  amir sanjar (asanjar)

For more details, see:

Exposed spark driver memory and spark executor memory, included module for allocating memory on config-changed hook as a percentage of total of free system memory and fall-back to autoallocation on overcommit errors. Tests included for configuration changes.
Your team Juju Big Data Development is subscribed to branch lp:~bigdata-dev/charms/trusty/apache-spark/trunk.
=== modified file 'config.yaml'
--- config.yaml	2015-06-05 04:09:25 +0000
+++ config.yaml	2015-06-05 21:12:22 +0000
@@ -4,9 +4,24 @@
         default: ''
         description: |
             URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
+<<<<<<< TREE
         type: string
         default: 'yarn-client'
         description: |
             Options are empty string "" for local (no YARN), "yarn-client, and "yarn-master". There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
+    spark_executor_memory:
+        type: string 
+        default: 'auto'
+        description: |
+            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
+            or 50f (meaning 50% of free system memory)
+    spark_driver_memory:
+        type: string 
+        default: 'auto'
+        description: |
+            Specify 'auto', percentage of total system memory (e.g. 50%), gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
+            or 50f (meaning 50% of free system memory)

=== modified file 'hooks/callbacks.py'
--- hooks/callbacks.py	2015-06-05 04:09:25 +0000
+++ hooks/callbacks.py	2015-06-05 21:12:22 +0000
@@ -1,3 +1,6 @@
+from subprocess import check_output, Popen
+import sparkmemalloc 
 import jujuresources
 from charmhelpers.core import hookenv
 from charmhelpers.core import host
@@ -62,7 +65,9 @@
         utils.re_edit_in_place(spark_log4j, {
                 r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',
+        with open(spark_default, 'a') as file:
+            file.write("spark.executor.memory\t256m\n")
     def configure_spark(self):
         Configure spark environment for all users
@@ -86,6 +91,23 @@
         utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)
         utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)
+    def spark_reconfigure(self):
+        sdm_automemory = 50 # percent of free memory to allocate to spark driver memory
+        sem_automemory = 15 # percent of free memory to allocate to spark executor memory
+        sdm_usermemory = hookenv.config()['spark_driver_memory']
+        sem_usermemory = hookenv.config()['spark_executor_memory']
+        jujuresources.juju_log('Allocating memory for spark driver...', 'INFO')
+        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, sdm_usermemory)
+        jujuresources.juju_log('Allocating memory for spark executor...', 'INFO')
+        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, sem_usermemory)
+        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
+        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
+        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
+        	r'.*spark.driver.memory *.*':sdm,
+        	r'.*spark.executor.memory *.*':sem,
+		})
     def spark_optimize(self):
         import subprocess
         from glob import glob
@@ -101,12 +123,12 @@
-        spark_conf = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
-        utils.re_edit_in_place(spark_conf, {
-            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    true',
-            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
-            })
+        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 'spark-defaults.conf',{
+            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
+            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    hdfs:///user/ubuntu/directory',
+	    })
+        self.spark_reconfigure()
     def start(self):
         with utils.environment_edit_in_place('/etc/environment') as env:
             env['MASTER']= hookenv.config()['spark_execution_mode']

=== modified file 'hooks/config-changed'
--- hooks/config-changed	2015-04-22 15:27:27 +0000
+++ hooks/config-changed	2015-06-05 21:12:22 +0000
@@ -13,3 +13,17 @@
 import common
+import callbacks
+import jujuresources 
+from charmhelpers.contrib import bigdata
+	spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
+	dist_config = bigdata.utils.DistConfig(filename='dist.yaml', required_keys=spark_reqs)
+	spark = callbacks.Spark(dist_config)
+	callbacks.Spark.spark_reconfigure(spark)
+except Exception,e:
+	logstring = "Problem with setting memory, check spark-defaults.conf exists: " + str(e)
+	jujuresources.juju_log(logstring, 'ERROR')
+	pass

=== added file 'hooks/sparkmemalloc.py'
--- hooks/sparkmemalloc.py	1970-01-01 00:00:00 +0000
+++ hooks/sparkmemalloc.py	2015-06-05 21:12:22 +0000
@@ -0,0 +1,93 @@
+# Class to calculate free memory given percentages and other values:
+# Accepts two variables, the later of which are both split into SIZE and TYPE, e.g. sparkmemalloc("95", "512m")
+# The first variable is the autoallocation value (always a percentage of user free memory so f is excluded)
+# the second variable is the user value
+# Valid size is between 1 and 100
+# Valid types:
+#       % - % of TOTAL system memory to allocate
+#       f - % of FREE (free + cache + buffers) to allocate (aka auto allocation)
+#       m - memory in mb to allocate
+#       g - memory in gb to allocate
+# !!! If the amount of memory specified by %, m or g exceeds current user memory,
+# autoallocation will be forced !!!
+import sys
+import jujuresources
+class getsysmem():
+        def __init__(self, unit):
+           with open('/proc/meminfo', 'r') as mem:
+                        lines = mem.readlines()
+           self._total = int(lines[0].split()[1])
+           self._free = int(lines[1].split()[1])
+           self._buff = int(lines[3].split()[1])
+           self._cached = int(lines[4].split()[1])
+           self.unit = unit
+        @property
+        def total(self):
+           return self._total / 1024
+        @property
+        def user_free(self):
+           return (self._free + self._buff + self._cached) / 1024
+class charmmemory():
+        def memcalc(self, autosize, usersize, alloctype):
+                self.alloctype = alloctype.lower()
+                self.usersize = int(usersize)
+                self.autosize = int(autosize)
+                if self.alloctype == "f":
+                        self.cursysmem = getsysmem('MB')
+                        logstring = "Memory Auto-configuration: " + str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
+                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.usersize)
+                elif self.alloctype == "%":
+                        self.cursysmem = getsysmem('MB')
+                        logstring = "Memory configuration: " + str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
+                        self.memamount = int((self.cursysmem.total / 100.00) * self.usersize)
+                elif self.alloctype.lower() == "m":
+                        self.cursysmem = getsysmem('MB')
+                        logstring = "Memory configuration: " + str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
+                        self.memamount = self.usersize
+                elif self.alloctype.lower() == "g":
+                        self.cursysmem = getsysmem('MB')
+                        logstring = "Memory configuration: " + str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 1024)
+                        # thats right, no si units here...
+                        self.memamount = self.usersize * 1000
+                if self.memamount > self.cursysmem.user_free:
+                        jujuresources.juju_log("Configuration overcommits free memory, switching to auto-configuration - check configuration values", 'ERROR')
+                        self.memamount = int((self.cursysmem.user_free / 100.00) * self.autosize)
+                        logstring = "Memory configuration: " + str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system memory (" + str(self.cursysmem.user_free) + ")"
+                jujuresources.juju_log(logstring, 'INFO')
+        def reqappmem(self, autosize, usersize):
+                self.usersize = usersize
+                self.autosize = autosize
+                memtype = self.usersize[-1:]
+                size = self.usersize[:-1]
+                if memtype != "f" and memtype != "%" and memtype.lower() != "m" and memtype.lower() != "g":
+                        jujuresources.juju_log('Invalid memory configuration type defined, enabling memory auto-configuration...', 'ERROR')
+                        size = self.autosize
+                        memtype = "f"
+                if not size or int(size) <1 :
+                        jujuresources.juju_log('Invalid memory configuration size (too small or nul), enabling memory auto-configuration', 'ERROR')
+                        size = self.autosize
+                        memtype = "f"
+                self.memcalc(int(self.autosize), int(size), memtype)
+                return self.memamount

=== modified file 'tests/100-deploy-spark-hdfs-yarn'
--- tests/100-deploy-spark-hdfs-yarn	2015-06-04 22:00:39 +0000
+++ tests/100-deploy-spark-hdfs-yarn	2015-06-05 21:12:22 +0000
@@ -2,7 +2,7 @@
 import unittest
 import amulet
+import random
 class TestDeploy(unittest.TestCase):
@@ -12,11 +12,17 @@
     def setUpClass(cls):
+        random.seed()
+        global sdmrnd, semrnd, sdmrndstr, sdmrndstr
+        sdmrnd = random.randint(10,100)
+        semrnd = random.randint(10,100)
+        sdmrndstr = str(sdmrnd) + 'f'
+        semrndstr = str(semrnd) + 'f'
         cls.d = amulet.Deployment(series='trusty')
         # Deploy a hadoop cluster
         cls.d.add('yarn-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
         cls.d.add('hdfs-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
-        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
+        cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
         cls.d.add('hadoop-plugin', charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
         cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
         cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
@@ -26,12 +32,31 @@
         # Add Spark Service
         cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
-        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
-        cls.d.setup(timeout=3600)
+        #cls.d.add('spark', charm='/root/canonical/trusty/apache-spark-merge')
+        sparkmemconf = {'spark_driver_memory': sdmrndstr, 'spark_executor_memory': semrndstr}
+        cls.d.configure('spark', sparkmemconf)
+        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
+        cls.d.setup(timeout=9000)
         cls.unit = cls.d.sentry.unit['spark/0']
+    def test_reconfigsdm(self):
+        sdmteststr = 'grep -A2 "Allocating memory for spark driver" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\'' 
+        output, retcode = self.unit.run(sdmteststr)
+        if str(output) != str(sdmrnd):
+                message = "Spark driver memory config test: Expected %s, got %s" % (str(sdmrnd), str(output))
+                amulet.raise_status(amulet.FAIL, msg=message)
+    def test_configsem(self):
+        semteststr = 'grep -A2 "Allocating memory for spark executor" /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' \'{print $1}\''
+        output, retcode = self.unit.run(semteststr)
+        if str(output) != str(semrnd):
+                message = "Spark executor memory config test: Expected %s, got %s" % (str(semrnd), str(output))
+                amulet.raise_status(amulet.FAIL, msg=message)
     def test_deploy(self):
         output, retcode = self.unit.run("pgrep -a java")
         assert 'HistoryServer' in output, "Spark HistoryServer is not started"

Follow ups