← Back to team overview

bigdata-dev team mailing list archive

[Merge] lp:~ewollesen/charms/trusty/apache-spark/spark-config into lp:~bigdata-dev/charms/trusty/apache-spark/trunk

 

Eric Wollesen has proposed merging lp:~ewollesen/charms/trusty/apache-spark/spark-config into lp:~bigdata-dev/charms/trusty/apache-spark/trunk.

Requested reviews:
  Juju Big Data Development (bigdata-dev)

For more details, see:
https://code.launchpad.net/~ewollesen/charms/trusty/apache-spark/spark-config/+merge/260782

Adds config options for +spark_local_dir+ and +spark_driver_cores+.
-- 
Your team Juju Big Data Development is requested to review the proposed merge of lp:~ewollesen/charms/trusty/apache-spark/spark-config into lp:~bigdata-dev/charms/trusty/apache-spark/trunk.
=== modified file 'config.yaml'
--- config.yaml	2015-05-30 04:34:18 +0000
+++ config.yaml	2015-06-02 02:08:08 +0000
@@ -4,4 +4,20 @@
         default: ''
         description: |
             URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
-    
+    spark_driver_cores:
+        type: int
+        default: 1
+        description: |
+            Number of cores to use for the driver process, only in cluster
+            mode.
+    spark_local_dir:
+        type: string
+        default: /tmp
+        description: |
+            Directory to use for "scratch" space in Spark, including map
+            output files and RDDs that get stored on disk. This should be on a
+            fast, local disk in your system. It can also be a comma-separated
+            list of multiple directories on different disks. NOTE: In Spark
+            1.0 and later this will be overriden by SPARK_LOCAL_DIRS
+            (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set
+            by the cluster manager.

=== modified file 'hooks/callbacks.py'
--- hooks/callbacks.py	2015-05-30 04:34:18 +0000
+++ hooks/callbacks.py	2015-06-02 02:08:08 +0000
@@ -1,4 +1,3 @@
-
 from subprocess import check_output, Popen
 
 import jujuresources
@@ -6,7 +5,7 @@
 from charmhelpers.core import unitdata
 from charmhelpers.contrib.bigdata import utils
 from path import Path
-
+import eawutils
 
 class Spark(object):
 
@@ -69,6 +68,9 @@
             env['SPARK_CONF_DIR'] = self.dist_config.path('spark_conf')
         self.configure_spark_hdfs()
         self.spark_optimize()
+        spark_default = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
+        spark_config = eawutils.getSparkConfig(hookenv.config())
+        eawutils.updateSparkConfig(spark_default, spark_config)
         cmd = "chown -R ubuntu:hadoop {}".format (spark_home)
         call(cmd.split())
         cmd = "chown -R ubuntu:hadoop {}".format (self.dist_config.path('spark_conf'))

=== added file 'hooks/eawutils.py'
--- hooks/eawutils.py	1970-01-01 00:00:00 +0000
+++ hooks/eawutils.py	2015-06-02 02:08:08 +0000
@@ -0,0 +1,45 @@
+# These functions should live in charmhelpers.contrib.bigdata.utils or
+# somewhere similar.
+import re
+from charmhelpers.contrib.bigdata import utils
+
+def updateSparkConfig(path, config):
+    """Updates spark config settings in +path+.
+
+    Assumes +path+ is in spark config file syntax."""
+    inserts, updates = calcSparkConfigUpserts(path, config)
+
+    utils.re_edit_in_place(path, updates)
+    with open(path, 'a') as configFile:
+        for item in inserts.items():
+            configFile.write("%s\t%s\n" % item)
+
+def calcSparkConfigUpserts(path, config):
+    """Calculate upserts to transform +path+ to +config+, idempotently.
+
+    Returns (inserts, updates)."""
+    inserts = config.copy()
+    updates = {}
+
+    with open(path, 'r') as configFile:
+        for line in configFile.readlines():
+            if line.startswith("#") or re.match('\A\s*\Z', line):
+                continue
+            key = line.split(None, 1)[0]
+            if key in config:
+                updates["^%s\s.*" % key] = "%s\t%s" % (key, config[key])
+                inserts.pop(key)
+
+    return inserts, updates
+
+def getKeysStartingWith(d, prefix):
+    "Return a dict of the keys prefixed with +prefix+."
+    return dict([(k,v) for k,v in d.items() if k.startswith(prefix)])
+
+def underscoreToDot(d):
+    "Return the dictionary with underscores in keys replaced with dots."
+    return dict([(k.replace("_", "."),v) for k,v in d.items()])
+
+def getSparkConfig(config):
+    "Return a dict of the keys prefixed with 'spark.', that have non-default values."
+    return underscoreToDot(getKeysStartingWith(config, "spark_"))

=== added file 'tests/100-spark-config'
--- tests/100-spark-config	1970-01-01 00:00:00 +0000
+++ tests/100-spark-config	2015-06-02 02:08:08 +0000
@@ -0,0 +1,42 @@
+#!/usr/bin/python3
+
+import amulet
+
+
+class TestSparkConfig(object):
+
+    def __init__(self):
+        d = amulet.Deployment(series='trusty')
+
+        d.add('hadoop-plugin', "cs:~bigdata-dev/trusty/apache-hadoop-plugin")
+        d.configure('spark', {'spark_executor_cores': 2})
+
+        d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
+
+        try:
+            d.setup(900)
+            d.sentry.wait(900)
+        except amulet.helpers.TimeoutError:
+            amulet.raise_status(amulet.FAIL,
+                                msg="Environment wasn't stood up in time")
+
+        self.spark_unit = d.sentry.unit['spark/0']
+
+    def run(self):
+        for test in dir(self):
+            if test.startswith('test_'):
+                getattr(self, test)()
+
+    # def test_default_passthrough(self):
+    #     output, code = self.spark_unit.run("grep -q '^spark.local.dir\t/tmp'")
+    #     if 0 == code:
+    #         amulet.raise_status(amulet.FAIL, "Failed to configure spark service defaults")
+
+    def test_config_setting(self):
+        output, code = self.spark_unit.run("grep -q 'spark.driver.cores\t2'")
+        if 0 != code:
+            amulet.raise_status(amulet.FAIL, "Failed to configure spark service")
+
+if __name__ == '__main__':
+    runner = TestSparkConfig()
+    runner.run()