← Back to team overview

sts-sponsors team mailing list archive

[Merge] ~syntroniks/maas:cps-pdu-power-driver into maas:master

 

none none has proposed merging ~syntroniks/maas:cps-pdu-power-driver into maas:master.

Commit message:
Add CyberPower PDU support

Requested reviews:
  MAAS Maintainers (maas-maintainers)

For more details, see:
https://code.launchpad.net/~syntroniks/maas/+git/maas/+merge/434995

This implements simple CyberPower switched PDU control as a power option
-- 
Your team MAAS Maintainers is requested to review the proposed merge of ~syntroniks/maas:cps-pdu-power-driver into maas:master.
diff --git a/src/provisioningserver/drivers/power/cps.py b/src/provisioningserver/drivers/power/cps.py
new file mode 100644
index 0000000..afbca72
--- /dev/null
+++ b/src/provisioningserver/drivers/power/cps.py
@@ -0,0 +1,130 @@
+# Copyright 2022- Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""CyberPower Systems (CPS) Power Driver.
+
+Support for managing CyberPower Systems (CPS) PDU outlets via SNMP.
+"""
+
+
+import re
+
+from provisioningserver.drivers import (
+    make_ip_extractor,
+    make_setting_field,
+    SETTING_SCOPE,
+)
+from provisioningserver.drivers.power import PowerActionError, PowerDriver
+from provisioningserver.utils import shell
+
+
+class CPSState:
+    ON = "1"
+    OFF = "2"
+
+
+# .iso(1).org(3).dod(6).internet(1).private(4).enterprises(1).cps(3808).products(1).hardware(1).ePDU(3)
+CPS_HARDWARE_OID = "1.3.6.1.4.1.3808.1.1.3"
+CPS_OUTLET_STATE_COMMANDS = {
+    "immediateOn": "1",
+    "immediateOff": "2",
+    "immediateReboot": "3",
+    "delayedOn": "4",
+    "delayedOff": "5",
+    "delayedReboot": "6",
+    "cancelPendingCommand": "7",
+    "outletIdentify": "8",
+}
+# .ePDUOutlet(3).ePDUOutletControl(3).ePDUOutletControlTable(1).ePDUOutletControlEntry(1).ePDUOutletControlOutletCommand(4)
+CPS_OUTLET_CONTROL_COMMAND = "3.3.1.1.4"
+# .ePDUOutlet(3).ePDUOutletStatus(5).ePDUOutletStatusTable(1).ePDUOutletStatusEntry(1).ePDUOutletStatusOutletState(4)
+CPS_OUTLET_STATUS_SUFFIX = "3.5.1.1.4"
+
+
+class CPSPowerDriver(PowerDriver):
+
+    name = "cps"
+    chassis = True
+    can_probe = False
+    can_set_boot_order = False
+    description = "CyberPower Systems (CPS) PDU"
+    settings = [
+        make_setting_field("power_address", "IP for CPS PDU", required=True),
+        make_setting_field(
+            "node_outlet",
+            "CPS PDU node outlet number (1-16)",
+            scope=SETTING_SCOPE.NODE,
+            required=True,
+        ),
+    ]
+    ip_extractor = make_ip_extractor("power_address")
+
+    def detect_missing_packages(self):
+        binary, package = ["snmpset", "snmp"]
+        if not shell.has_command_available(binary):
+            return [package]
+        return []
+
+    def run_process(self, *command):
+        """Run SNMP command in subprocess."""
+        result = shell.run_command(*command)
+        if result.returncode != 0:
+            raise PowerActionError(
+                "CPS Power Driver external process error for command %s: %s"
+                % ("".join(command), result.stderr)
+            )
+        match = re.search(r"INTEGER:\s*([1-2])", result.stdout)
+        if match is None:
+            raise PowerActionError(
+                "CPS Power Driver unable to extract outlet power state"
+                " from: %s" % result.stdout
+            )
+        else:
+            return match.group(1)
+
+    def power_on(self, system_id, context):
+        """Power on CPS outlet."""
+        outlet = context["node_outlet"]
+        if self.power_query(system_id, context) == "on":
+            self.power_off(system_id, context)
+        self.run_process(
+            "snmpset",
+            *_get_common_args(context),
+            f"{CPS_HARDWARE_OID}.{CPS_OUTLET_CONTROL_COMMAND}.{outlet}",
+            "i",
+            CPS_OUTLET_STATE_COMMANDS["immediateOn"],
+        )
+
+    def power_off(self, system_id, context):
+        """Power off CPS outlet."""
+        outlet = context["node_outlet"]
+        self.run_process(
+            "snmpset",
+            *_get_common_args(context),
+            f"{CPS_HARDWARE_OID}.{CPS_OUTLET_CONTROL_COMMAND}.{outlet}",
+            "i",
+            CPS_OUTLET_STATE_COMMANDS["immediateOff"],
+        )
+
+    def power_query(self, system_id, context):
+        """Power query CPS outlet."""
+        outlet = context["node_outlet"]
+        power_state = self.run_process(
+            "snmpget",
+            *_get_common_args(context),
+            f"{CPS_HARDWARE_OID}.{CPS_OUTLET_STATUS_SUFFIX}.{outlet}",
+        )
+        if power_state == CPSState.OFF:
+            return "off"
+        elif power_state == CPSState.ON:
+            return "on"
+        else:
+            raise PowerActionError(
+                "CPS Power Driver retrieved unknown power state: %r"
+                % power_state
+            )
+
+
+def _get_common_args(context):
+    address = context["power_address"]
+    return ["-c", "private", "-v1", address]
diff --git a/src/provisioningserver/drivers/power/tests/test_cps.py b/src/provisioningserver/drivers/power/tests/test_cps.py
new file mode 100644
index 0000000..0663c80
--- /dev/null
+++ b/src/provisioningserver/drivers/power/tests/test_cps.py
@@ -0,0 +1,154 @@
+# Copyright 2015-2016 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for `provisioningserver.drivers.power.cps`."""
+
+import random
+
+from testtools.matchers import Equals
+
+from maastesting.factory import factory
+from maastesting.matchers import MockCalledOnceWith
+from maastesting.testcase import MAASTestCase
+from provisioningserver.drivers.power import cps as cps_module
+from provisioningserver.drivers.power import PowerActionError
+from provisioningserver.utils.shell import has_command_available, ProcessResult
+
+COMMON_ARGS_STATUS = "-c private -v1 {} 1.3.6.1.4.1.3808.1.1.3.3.5.1.1.4.{}"
+COMMON_ARGS_CONTROL = "-c private -v1 {} 1.3.6.1.4.1.3808.1.1.3.3.3.1.1.4.{}"
+COMMON_OUTPUT = "iso.3.6.1.4.1.3808.1.1.3.3.5.1.1.4.%s = INTEGER: 1\n"
+
+
+class TestCPSPowerDriver(MAASTestCase):
+    def make_context(self, outlet=None):
+        context = {
+            "power_address": factory.make_name("power_address"),
+            "node_outlet": (
+                outlet if outlet is not None else random.randrange(1, 8)
+            ),
+        }
+        return context
+
+    def test_missing_packages(self):
+        mock = self.patch(has_command_available)
+        mock.return_value = False
+        driver = cps_module.CPSPowerDriver()
+        missing = driver.detect_missing_packages()
+        self.assertEqual(["snmp"], missing)
+
+    def test_no_missing_packages(self):
+        mock = self.patch(has_command_available)
+        mock.return_value = True
+        driver = cps_module.CPSPowerDriver()
+        missing = driver.detect_missing_packages()
+        self.assertEqual([], missing)
+
+    def patch_run_command(self, stdout="", stderr="", returncode=0):
+        mock_run_command = self.patch(cps_module.shell, "run_command")
+        mock_run_command.return_value = ProcessResult(
+            stdout=stdout, stderr=stderr, returncode=returncode
+        )
+        return mock_run_command
+
+    def test_run_process_calls_command_and_returns_output(self):
+        driver = cps_module.CPSPowerDriver()
+        context = self.make_context()
+        command = ["snmpget"] + COMMON_ARGS_STATUS.format(
+            context["power_address"], context["node_outlet"]
+        ).split()
+        mock_run_command = self.patch_run_command(
+            stdout=COMMON_OUTPUT % context["node_outlet"],
+            stderr="error_output",
+        )
+        output = driver.run_process(*command)
+        mock_run_command.assert_called_once_with(*command)
+        self.expectThat(
+            output, Equals(cps_module.CPS_OUTLET_STATE_COMMANDS["immediateOn"])
+        )
+
+    def test_run_process_crashes_on_external_process_error(self):
+        driver = cps_module.CPSPowerDriver()
+        self.patch_run_command(returncode=1)
+        self.assertRaises(
+            PowerActionError, driver.run_process, factory.make_name("command")
+        )
+
+    def test_run_process_crashes_on_no_power_state_match_found(self):
+        driver = cps_module.CPSPowerDriver()
+        self.patch_run_command(stdout="Error")
+        self.assertRaises(
+            PowerActionError, driver.run_process, factory.make_name("command")
+        )
+
+    def test_power_on_calls_run_process(self):
+        driver = cps_module.CPSPowerDriver()
+        system_id = factory.make_name("system_id")
+        context = self.make_context()
+        mock_power_query = self.patch(driver, "power_query")
+        mock_power_query.return_value = "on"
+        self.patch(driver, "power_off")
+        mock_run_process = self.patch(driver, "run_process")
+        driver.power_on(system_id, context)
+
+        self.expectThat(
+            mock_power_query, MockCalledOnceWith(system_id, context)
+        )
+        command = (
+            ["snmpset"]
+            + COMMON_ARGS_CONTROL.format(
+                context["power_address"], context["node_outlet"]
+            ).split()
+            + ["i", cps_module.CPS_OUTLET_STATE_COMMANDS["immediateOn"]]
+        )
+        mock_run_process.assert_called_once_with(*command)
+
+    def test_power_off_calls_run_process(self):
+        driver = cps_module.CPSPowerDriver()
+        system_id = factory.make_name("system_id")
+        context = self.make_context()
+        mock_run_process = self.patch(driver, "run_process")
+        driver.power_off(system_id, context)
+        command = (
+            ["snmpset"]
+            + COMMON_ARGS_CONTROL.format(
+                context["power_address"], context["node_outlet"]
+            ).split()
+            + ["i", cps_module.CPS_OUTLET_STATE_COMMANDS["immediateOff"]]
+        )
+        mock_run_process.assert_called_once_with(*command)
+
+    def test_power_query_returns_power_state_on(self):
+        driver = cps_module.CPSPowerDriver()
+        system_id = factory.make_name("system_id")
+        context = self.make_context()
+        mock_run_process = self.patch(driver, "run_process")
+        mock_run_process.return_value = "1"
+        result = driver.power_query(system_id, context)
+        command = ["snmpget"] + COMMON_ARGS_STATUS.format(
+            context["power_address"], context["node_outlet"]
+        ).split()
+        mock_run_process.assert_called_once_with(*command)
+        self.expectThat(result, Equals("on"))
+
+    def test_power_query_returns_power_state_off(self):
+        driver = cps_module.CPSPowerDriver()
+        system_id = factory.make_name("system_id")
+        context = self.make_context()
+        mock_run_process = self.patch(driver, "run_process")
+        mock_run_process.return_value = "2"
+        result = driver.power_query(system_id, context)
+        command = ["snmpget"] + COMMON_ARGS_STATUS.format(
+            context["power_address"], context["node_outlet"]
+        ).split()
+        mock_run_process.assert_called_once_with(*command)
+        self.expectThat(result, Equals("off"))
+
+    def test_power_query_crashes_for_uknown_power_state(self):
+        driver = cps_module.CPSPowerDriver()
+        system_id = factory.make_name("system_id")
+        context = self.make_context()
+        mock_run_process = self.patch(driver, "run_process")
+        mock_run_process.return_value = "Error"
+        self.assertRaises(
+            PowerActionError, driver.power_query, system_id, context
+        )

Follow ups