From 11b5a17d7cd6f8b946093df84bcf228650ccc690 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 18 Sep 2024 16:45:09 +0200 Subject: [PATCH] Add support for streams-bootstrap v3 (#307) --- backend/streams_explorer/core/k8s_app.py | 12 +- .../core/k8s_config_parser.py | 18 +-- .../core/services/dataflow_graph.py | 16 +-- backend/streams_explorer/models/k8s.py | 6 +- backend/tests/test_dataflow_graph.py | 8 +- backend/tests/test_k8s_app.py | 38 +++++- backend/tests/utils.py | 108 +++++++++++------- 7 files changed, 131 insertions(+), 75 deletions(-) diff --git a/backend/streams_explorer/core/k8s_app.py b/backend/streams_explorer/core/k8s_app.py index 1c9955bb..0d6f364b 100644 --- a/backend/streams_explorer/core/k8s_app.py +++ b/backend/streams_explorer/core/k8s_app.py @@ -60,16 +60,16 @@ def input_pattern(self) -> str | None: return self.config.input_pattern @property - def extra_input_topics(self) -> list[str]: - return self.config.extra_input_topics + def labeled_input_topics(self) -> list[str]: + return self.config.labeled_input_topics @property - def extra_output_topics(self) -> list[str]: - return self.config.extra_output_topics + def labeled_output_topics(self) -> list[str]: + return self.config.labeled_output_topics @property - def extra_input_patterns(self) -> list[str]: - return self.config.extra_input_patterns + def labeled_input_patterns(self) -> list[str]: + return self.config.labeled_input_patterns @property def pipeline(self) -> str | None: diff --git a/backend/streams_explorer/core/k8s_config_parser.py b/backend/streams_explorer/core/k8s_config_parser.py index 5f38e45b..06d7c870 100644 --- a/backend/streams_explorer/core/k8s_config_parser.py +++ b/backend/streams_explorer/core/k8s_config_parser.py @@ -54,14 +54,14 @@ def parse_config(self, name: str, value: str) -> None: self.config.output_topic = value case "ERROR_TOPIC": self.config.error_topic = value - case "EXTRA_INPUT_TOPICS": - self.config.extra_input_topics = self.parse_extra_topics(value) - case "EXTRA_OUTPUT_TOPICS": - self.config.extra_output_topics = self.parse_extra_topics(value) + case "LABELED_INPUT_TOPICS" | "EXTRA_INPUT_TOPICS": + self.config.labeled_input_topics = self.parse_labeled_topics(value) + case "LABELED_OUTPUT_TOPICS" | "EXTRA_OUTPUT_TOPICS": + self.config.labeled_output_topics = self.parse_labeled_topics(value) case "INPUT_PATTERN": self.config.input_pattern = value - case "EXTRA_INPUT_PATTERNS": - self.config.extra_input_patterns = self.parse_extra_topics(value) + case "LABELED_INPUT_PATTERNS" | "EXTRA_INPUT_PATTERNS": + self.config.labeled_input_patterns = self.parse_labeled_topics(value) case _: self.config.extra[name] = value @@ -70,11 +70,11 @@ def parse_input_topics(input_topics: str) -> list[str]: return input_topics.split(",") @staticmethod - def parse_extra_topics(extra_topics: str) -> list[str]: - extra_topics = extra_topics.removesuffix(",") # remove trailing comma + def parse_labeled_topics(labeled_topics: str) -> list[str]: + labeled_topics = labeled_topics.removesuffix(",") # remove trailing comma return [ topic - for role in extra_topics.split(",") + for role in labeled_topics.split(",") for topic in role.split("=")[1].split(";") ] diff --git a/backend/streams_explorer/core/services/dataflow_graph.py b/backend/streams_explorer/core/services/dataflow_graph.py index c33c4521..6e92d82e 100644 --- a/backend/streams_explorer/core/services/dataflow_graph.py +++ b/backend/streams_explorer/core/services/dataflow_graph.py @@ -87,14 +87,14 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None: self._add_error_topic(graph, app.id, app.error_topic) if app.input_pattern: self._enqueue_input_pattern(app.input_pattern, app.id) - for extra_input in app.extra_input_topics: - self._add_topic(graph, extra_input) - self._add_input_topic(graph, app.id, extra_input) - for extra_output in app.extra_output_topics: - self._add_topic(graph, extra_output) - self._add_output_topic(graph, app.id, extra_output) - for extra_pattern in app.extra_input_patterns: - self._enqueue_input_pattern(extra_pattern, app.id) + for labeled_input in app.labeled_input_topics: + self._add_topic(graph, labeled_input) + self._add_input_topic(graph, app.id, labeled_input) + for labeled_output in app.labeled_output_topics: + self._add_topic(graph, labeled_output) + self._add_output_topic(graph, app.id, labeled_output) + for labeled_pattern in app.labeled_input_patterns: + self._enqueue_input_pattern(labeled_pattern, app.id) def add_connector( self, connector: KafkaConnector, pipeline: str | None = None diff --git a/backend/streams_explorer/models/k8s.py b/backend/streams_explorer/models/k8s.py index 77c533c1..080671af 100644 --- a/backend/streams_explorer/models/k8s.py +++ b/backend/streams_explorer/models/k8s.py @@ -12,9 +12,9 @@ class K8sConfig: output_topic: str | None = None # required for streaming app error_topic: str | None = None input_pattern: str | None = None - extra_input_topics: list[str] = field(default_factory=list) - extra_output_topics: list[str] = field(default_factory=list) - extra_input_patterns: list[str] = field(default_factory=list) + labeled_input_topics: list[str] = field(default_factory=list) + labeled_output_topics: list[str] = field(default_factory=list) + labeled_input_patterns: list[str] = field(default_factory=list) extra: dict[str, str] = field(default_factory=dict) diff --git a/backend/tests/test_dataflow_graph.py b/backend/tests/test_dataflow_graph.py index ff639387..69c37241 100644 --- a/backend/tests/test_dataflow_graph.py +++ b/backend/tests/test_dataflow_graph.py @@ -192,7 +192,7 @@ def test_no_resolve_input_pattern(self, df: DataFlowGraph): assert df.graph.has_edge("test-namespace/test-app2", "output-topic2") assert df.graph.has_edge("test-namespace/test-app2", "fake2-dead-letter-topic") - def test_resolve_extra_input_patterns(self, df: DataFlowGraph): + def test_resolve_labeled_input_patterns(self, df: DataFlowGraph): df.add_streaming_app( K8sApp.factory( get_streaming_app_deployment( @@ -215,7 +215,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph): input_topics="output-topic", output_topic="another-topic", error_topic="fake2-dead-letter-topic", - extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic", + labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic", ) ) ) @@ -228,7 +228,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph): assert not df.graph.has_edge("another-topic", "test-namespace/test-app2") assert df.graph.has_edge("fake-dead-letter-topic", "test-namespace/test-app2") - def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph): + def test_no_resolve_labeled_input_patterns(self, df: DataFlowGraph): settings.graph.resolve.input_pattern_topics.all = False df.add_streaming_app( K8sApp.factory( @@ -246,7 +246,7 @@ def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph): input_topics="output-topic", output_topic="output-topic2", error_topic="fake2-dead-letter-topic", - extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic", + labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic", ) ) ) diff --git a/backend/tests/test_k8s_app.py b/backend/tests/test_k8s_app.py index 934ce578..20d17936 100644 --- a/backend/tests/test_k8s_app.py +++ b/backend/tests/test_k8s_app.py @@ -138,7 +138,7 @@ def test_env_prefix_support(self): assert k8s_app.output_topic == "output-topic" assert k8s_app.input_topics == ["input-topic"] - def test_extra_input_topics(self): + def test_streams_bootstrap_v2_extra_input_topics(self): k8s_app = K8sAppDeployment( get_streaming_app_deployment( name="test-app", @@ -147,11 +147,40 @@ def test_extra_input_topics(self): error_topic="error-topic", multiple_inputs="0=test1,1=test2;test3,", env_prefix="TEST_", + streams_bootstrap_version=2, ) ) - assert k8s_app.extra_input_topics == ["test1", "test2", "test3"] + assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"] - def test_extra_output_topics(self): + def test_streams_bootstrap_v3_labeled_input_topics(self): + k8s_app = K8sAppDeployment( + get_streaming_app_deployment( + name="test-app", + input_topics="input-topic", + output_topic="output-topic", + error_topic="error-topic", + multiple_inputs="0=test1,1=test2;test3,", + env_prefix="TEST_", + streams_bootstrap_version=3, + ) + ) + assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"] + + def test_streams_bootstrap_v2_extra_output_topics(self): + k8s_app = K8sAppDeployment( + get_streaming_app_deployment( + name="test-app", + input_topics="input-topic", + output_topic="output-topic", + error_topic="error-topic", + multiple_outputs="0=test1,1=test2", + env_prefix="TEST_", + streams_bootstrap_version=2, + ) + ) + assert k8s_app.labeled_output_topics == ["test1", "test2"] + + def test_streams_bootstrap_v3_labeled_output_topics(self): k8s_app = K8sAppDeployment( get_streaming_app_deployment( name="test-app", @@ -160,9 +189,10 @@ def test_extra_output_topics(self): error_topic="error-topic", multiple_outputs="0=test1,1=test2", env_prefix="TEST_", + streams_bootstrap_version=3, ) ) - assert k8s_app.extra_output_topics == ["test1", "test2"] + assert k8s_app.labeled_output_topics == ["test1", "test2"] def test_attributes(self): k8s_app = K8sAppDeployment( diff --git a/backend/tests/utils.py b/backend/tests/utils.py index eeb1851d..af64e6ba 100644 --- a/backend/tests/utils.py +++ b/backend/tests/utils.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Literal from kubernetes_asyncio.client import ( V1beta1CronJob, @@ -26,6 +27,9 @@ class ConfigType(str, Enum): ARGS = "args" +StreamsBootstrapVersion = Literal[2, 3] + + def get_streaming_app_deployment( name: str = "test-app", input_topics: str | None = "input-topic", @@ -34,26 +38,28 @@ def get_streaming_app_deployment( input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str = "APP_", namespace: str = "test-namespace", pipeline: str | None = None, consumer_group: str | None = None, config_type: ConfigType = ConfigType.ENV, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1Deployment: template = get_template( - input_topics, - output_topic, - error_topic, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, config_type=config_type, + streams_bootstrap_version=streams_bootstrap_version, ) spec = V1DeploymentSpec(template=template, selector=V1LabelSelector()) metadata = get_metadata(name, namespace=namespace, pipeline=pipeline) @@ -69,7 +75,7 @@ def get_streaming_app_stateful_set( input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str = "APP_", namespace: str = "test-namespace", @@ -77,19 +83,21 @@ def get_streaming_app_stateful_set( consumer_group: str | None = None, service_name: str = "test-service", config_type: ConfigType = ConfigType.ENV, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1StatefulSet: template = get_template( - input_topics, - output_topic, - error_topic, - input_pattern, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, config_type=config_type, + streams_bootstrap_version=streams_bootstrap_version, ) metadata = get_metadata(name, namespace=namespace, pipeline=pipeline) spec = V1StatefulSetSpec( @@ -109,12 +117,14 @@ def get_streaming_app_cronjob( env_prefix: str = "APP_", namespace: str = "test-namespace", pipeline: str | None = None, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1beta1CronJob: env = get_env( - input_topics, - output_topic, - error_topic, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, env_prefix=env_prefix, + streams_bootstrap_version=streams_bootstrap_version, ) container = V1Container(name="test-container", env=env) pod_spec = V1PodSpec(containers=[container]) @@ -147,16 +157,19 @@ def get_metadata(name, *, namespace: str, pipeline: str | None = None) -> V1Obje def get_env( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, - env_prefix: str = "APP_", + env_prefix: str, + streams_bootstrap_version: StreamsBootstrapVersion, ) -> list[V1EnvVar]: + labeled_topics_prefix = "EXTRA" if streams_bootstrap_version == 2 else "LABELED" env = [V1EnvVar(name="ENV_PREFIX", value=env_prefix)] if input_topics: env.append(V1EnvVar(name=env_prefix + "INPUT_TOPICS", value=input_topics)) @@ -168,16 +181,23 @@ def get_env( env.append(V1EnvVar(name=env_prefix + "INPUT_PATTERN", value=input_pattern)) if multiple_inputs: env.append( - V1EnvVar(name=env_prefix + "EXTRA_INPUT_TOPICS", value=multiple_inputs) + V1EnvVar( + name=env_prefix + labeled_topics_prefix + "_INPUT_TOPICS", + value=multiple_inputs, + ) ) if multiple_outputs: env.append( - V1EnvVar(name=env_prefix + "EXTRA_OUTPUT_TOPICS", value=multiple_outputs) + V1EnvVar( + name=env_prefix + labeled_topics_prefix + "_OUTPUT_TOPICS", + value=multiple_outputs, + ) ) - if extra_input_patterns: + if labeled_input_patterns: env.append( V1EnvVar( - name=env_prefix + "EXTRA_INPUT_PATTERNS", value=extra_input_patterns + name=env_prefix + labeled_topics_prefix + "_INPUT_PATTERNS", + value=labeled_input_patterns, ) ) if extra: @@ -191,13 +211,15 @@ def _create_arg(name: str, value: str) -> str: def get_args( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, multiple_inputs: str | None, multiple_outputs: str | None, - extra_input_patterns: str | None, + labeled_input_patterns: str | None, extra: dict[str, str], + streams_bootstrap_version: StreamsBootstrapVersion, ) -> list[str]: args = [] if input_topics: @@ -210,8 +232,8 @@ def get_args( args.append(_create_arg("extra-input-topics", multiple_inputs)) if multiple_outputs: args.append(_create_arg("extra-output-topics", multiple_outputs)) - if extra_input_patterns: - args.append(_create_arg("extra-input-patterns", extra_input_patterns)) + if labeled_input_patterns: + args.append(_create_arg("extra-input-patterns", labeled_input_patterns)) if extra: for k, v in extra.items(): args.append(_create_arg(k, v)) @@ -219,42 +241,46 @@ def get_args( def get_template( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, input_pattern: str | None, multiple_inputs: str | None, multiple_outputs: str | None, - extra_input_patterns: str | None, + labeled_input_patterns: str | None, extra: dict[str, str], - env_prefix: str = "APP_", - consumer_group: str | None = None, - config_type: ConfigType = ConfigType.ENV, + env_prefix: str, + consumer_group: str | None, + config_type: ConfigType, + streams_bootstrap_version: StreamsBootstrapVersion, ) -> V1PodTemplateSpec: env = None args = None match config_type: case ConfigType.ENV: env = get_env( - input_topics, - output_topic, - error_topic, - input_pattern, - multiple_inputs, - multiple_outputs, - extra_input_patterns=extra_input_patterns, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + input_pattern=input_pattern, + multiple_inputs=multiple_inputs, + multiple_outputs=multiple_outputs, + labeled_input_patterns=labeled_input_patterns, env_prefix=env_prefix, extra=extra, + streams_bootstrap_version=streams_bootstrap_version, ) case ConfigType.ARGS: args = get_args( - input_topics, - output_topic, - error_topic, - multiple_inputs, - multiple_outputs, - extra_input_patterns, - extra, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + multiple_inputs=multiple_inputs, + multiple_outputs=multiple_outputs, + labeled_input_patterns=labeled_input_patterns, + extra=extra, + streams_bootstrap_version=streams_bootstrap_version, ) container = V1Container(name="test-container", env=env, args=args) pod_spec = V1PodSpec(containers=[container])