← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:kafka-producer into launchpad:master

 

Colin Watson has proposed merging ~cjwatson/launchpad:kafka-producer into launchpad:master.

Commit message:
Add a simple Kafka producer abstraction

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/439414

This isn't used anywhere as yet, but in an earlier experiment I was able to use this to send events to Kafka on `git push`.

I want the abstraction layer in place for ease of testing, and in order that we can switch to different bindings without too much trouble, since we aren't very familiar with the operational properties of these bindings as yet.

Dependencies MP: https://code.launchpad.net/~cjwatson/lp-source-dependencies/+git/lp-source-dependencies/+merge/439413
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:kafka-producer into launchpad:master.
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index db9893c..d8ac9b8 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -895,6 +895,28 @@ public_https: True
 port: 11371
 
 
+[kafka]
+# Space-separated list of host:port strings to contact to bootstrap initial
+# Kafka cluster metadata.
+bootstrap_servers: none
+
+# A name for Launchpad's Kafka client, sent in requests to servers.
+client_id: launchpad
+
+# Username to use when connecting to the Kafka cluster.
+username: none
+
+# Password to use when connecting to the Kafka cluster.
+password: none
+
+# Path to CA certificate file to use when connecting to the Kafka cluster.
+ca_certificate_path: none
+
+# Service name to use as part of Kafka topic names.  May only contain
+# [a-z0-9.-].
+service_name: launchpad
+
+
 [karmacacheupdater]
 # The database user which will be used by this process.
 # datatype: string
diff --git a/lib/lp/services/configure.zcml b/lib/lp/services/configure.zcml
index 77dac94..356ea99 100644
--- a/lib/lp/services/configure.zcml
+++ b/lib/lp/services/configure.zcml
@@ -16,6 +16,7 @@
   <include package=".identity" />
   <include package=".inlinehelp" file="meta.zcml" />
   <include package=".job" />
+  <include package=".kafka" />
   <include package=".librarian" />
   <include package=".macaroons" />
   <include package=".mail" />
diff --git a/lib/lp/services/kafka/__init__.py b/lib/lp/services/kafka/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/kafka/__init__.py
diff --git a/lib/lp/services/kafka/client.py b/lib/lp/services/kafka/client.py
new file mode 100644
index 0000000..8cc9523
--- /dev/null
+++ b/lib/lp/services/kafka/client.py
@@ -0,0 +1,59 @@
+# Copyright 2023 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Kafka clients."""
+
+__all__ = [
+    "KafkaProducer",
+]
+
+import json
+from typing import Optional
+
+from kafka import KafkaProducer as BaseKafkaProducer
+from zope.interface import implementer
+
+from lp.services.config import config
+from lp.services.kafka.interfaces import IKafkaProducer, KafkaUnconfigured
+from lp.services.propertycache import cachedproperty, get_property_cache
+
+
+@implementer(IKafkaProducer)
+class KafkaProducer:
+    """See `IKafkaProducer`."""
+
+    @cachedproperty
+    def _producer(self) -> BaseKafkaProducer:
+        if config.kafka.bootstrap_servers is None:
+            raise KafkaUnconfigured()
+        return BaseKafkaProducer(
+            bootstrap_servers=config.kafka.bootstrap_servers.split(),
+            client_id=config.kafka.client_id,
+            key_serializer=lambda m: json.dumps(m).encode(),
+            value_serializer=lambda m: json.dumps(m).encode(),
+            security_protocol="SASL_SSL",
+            ssl_check_hostname=False,
+            ssl_cafile=config.kafka.ca_certificate_path,
+            sasl_plain_username=config.kafka.username,
+            sasl_plain_password=config.kafka.password,
+            sasl_mechanism="SCRAM-SHA-512",
+        )
+
+    def send(
+        self,
+        topic: str,
+        value: Optional[object] = None,
+        key: Optional[object] = None,
+    ) -> None:
+        """See `IKafkaProducer`."""
+        self._producer.send(topic, value=value, key=key)
+
+    def flush(self) -> None:
+        """See `IKafkaProducer`."""
+        self._producer.flush()
+
+    def close(self) -> None:
+        """See `IKafkaProducer`."""
+        self._producer.flush()
+        self._producer.close()
+        del get_property_cache(self)._producer
diff --git a/lib/lp/services/kafka/configure.zcml b/lib/lp/services/kafka/configure.zcml
new file mode 100644
index 0000000..ebdf741
--- /dev/null
+++ b/lib/lp/services/kafka/configure.zcml
@@ -0,0 +1,16 @@
+<!-- Copyright 2023 Canonical Ltd.  This software is licensed under the
+     GNU Affero General Public License version 3 (see the file LICENSE).
+-->
+
+<configure
+    xmlns="http://namespaces.zope.org/zope";
+    xmlns:i18n="http://namespaces.zope.org/i18n";
+    xmlns:lp="http://namespaces.canonical.com/lp";
+    i18n_domain="launchpad">
+
+    <lp:securedutility
+        class="lp.services.kafka.client.KafkaProducer"
+        provides="lp.services.kafka.interfaces.IKafkaProducer">
+        <allow interface="lp.services.kafka.interfaces.IKafkaProducer" />
+    </lp:securedutility>
+</configure>
diff --git a/lib/lp/services/kafka/interfaces.py b/lib/lp/services/kafka/interfaces.py
new file mode 100644
index 0000000..bf8cfa5
--- /dev/null
+++ b/lib/lp/services/kafka/interfaces.py
@@ -0,0 +1,41 @@
+# Copyright 2023 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Kafka client interfaces."""
+
+__all__ = [
+    "IKafkaProducer",
+    "KafkaUnconfigured",
+]
+
+from typing import Optional
+
+from zope.interface import Interface
+
+
+class KafkaUnconfigured(Exception):
+    pass
+
+
+class IKafkaProducer(Interface):
+    """A client that publishes records to the Kafka cluster."""
+
+    def send(
+        topic: str,
+        value: Optional[object] = None,
+        key: Optional[object] = None,
+    ) -> None:
+        """Publish a message to a topic.
+
+        The value and key are serialized using JSON.
+        """
+
+    def flush() -> None:
+        """Flush all previously-sent messages to the server.
+
+        Blocks until all requests have completed, whether successfully or
+        with errors.
+        """
+
+    def close() -> None:
+        """Close this producer."""
diff --git a/lib/lp/services/kafka/tests/__init__.py b/lib/lp/services/kafka/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/lp/services/kafka/tests/__init__.py
diff --git a/lib/lp/services/kafka/tests/test_client.py b/lib/lp/services/kafka/tests/test_client.py
new file mode 100644
index 0000000..34faf8f
--- /dev/null
+++ b/lib/lp/services/kafka/tests/test_client.py
@@ -0,0 +1,105 @@
+# Copyright 2023 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for Kafka clients."""
+
+from unittest import mock
+
+from fixtures import MockPatch
+from zope.component import getUtility
+
+from lp.services.kafka.interfaces import IKafkaProducer, KafkaUnconfigured
+from lp.services.propertycache import clear_property_cache
+from lp.testing import TestCase
+from lp.testing.layers import ZopelessLayer
+
+
+class TestKafkaProducer(TestCase):
+
+    layer = ZopelessLayer
+
+    def setUp(self):
+        super().setUp()
+        self.pushConfig(
+            "kafka",
+            bootstrap_servers="kafka.example.com:9092",
+            username="launchpad-service",
+            password="secret",
+        )
+        self.mock_producer_class = self.useFixture(
+            MockPatch("lp.services.kafka.client.BaseKafkaProducer")
+        ).mock
+        self.bootstrap_call = mock.call(
+            bootstrap_servers=["kafka.example.com:9092"],
+            client_id="launchpad",
+            key_serializer=mock.ANY,
+            value_serializer=mock.ANY,
+            security_protocol="SASL_SSL",
+            ssl_check_hostname=False,
+            ssl_cafile=None,
+            sasl_plain_username="launchpad-service",
+            sasl_plain_password="secret",
+            sasl_mechanism="SCRAM-SHA-512",
+        )
+        self.addCleanup(clear_property_cache, getUtility(IKafkaProducer))
+
+    def test_unconfigured(self):
+        self.pushConfig("kafka", bootstrap_servers=None)
+        self.assertRaises(
+            KafkaUnconfigured, getUtility(IKafkaProducer).send, "test-topic"
+        )
+
+    def test_send(self):
+        value = {"event": "create"}
+        key = {"id": 1}
+        getUtility(IKafkaProducer).send("test-topic", value=value, key=key)
+        self.mock_producer_class.assert_has_calls(
+            [
+                self.bootstrap_call,
+                self.bootstrap_call.send(
+                    "test-topic", value={"event": "create"}, key={"id": 1}
+                ),
+            ],
+            any_order=False,
+        )
+
+    def test_send_and_flush(self):
+        value = {"event": "create"}
+        key = {"id": 1}
+        getUtility(IKafkaProducer).send("test-topic", value=value, key=key)
+        getUtility(IKafkaProducer).flush()
+        self.mock_producer_class.assert_has_calls(
+            [
+                self.bootstrap_call,
+                self.bootstrap_call.send(
+                    "test-topic", value={"event": "create"}, key={"id": 1}
+                ),
+                self.bootstrap_call.flush(),
+            ],
+            any_order=False,
+        )
+
+    def test_close(self):
+        getUtility(IKafkaProducer).close()
+        self.mock_producer_class.assert_has_calls(
+            [
+                self.bootstrap_call,
+                self.bootstrap_call.flush(),
+                self.bootstrap_call.close(),
+            ],
+            any_order=False,
+        )
+
+    def test_close_and_reopen(self):
+        getUtility(IKafkaProducer).close()
+        getUtility(IKafkaProducer).flush()
+        self.mock_producer_class.assert_has_calls(
+            [
+                self.bootstrap_call,
+                self.bootstrap_call.flush(),
+                self.bootstrap_call.close(),
+                self.bootstrap_call,
+                self.bootstrap_call.flush(),
+            ],
+            any_order=False,
+        )
diff --git a/requirements/launchpad.txt b/requirements/launchpad.txt
index 0b7e852..caca0d9 100644
--- a/requirements/launchpad.txt
+++ b/requirements/launchpad.txt
@@ -72,6 +72,7 @@ iso8601==0.1.12
 jedi==0.17.2
 jmespath==0.10.0
 jsautobuild==0.2
+kafka-python==2.0.2
 keyring==0.6.2
 keystoneauth1==4.1.0
 kombu==4.6.11
diff --git a/setup.cfg b/setup.cfg
index be574c6..6e0ea0e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -44,6 +44,7 @@ install_requires =
     importlib-resources; python_version < "3.7"
     ipython
     jsautobuild
+    kafka-python
     kombu
     launchpad-buildd
     launchpadlib