Skip to content

Commit

Permalink
Add support for streams-bootstrap v3 (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Sep 18, 2024
1 parent 6a7dc29 commit 11b5a17
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 75 deletions.
12 changes: 6 additions & 6 deletions backend/streams_explorer/core/k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ def input_pattern(self) -> str | None:
return self.config.input_pattern

@property
def extra_input_topics(self) -> list[str]:
return self.config.extra_input_topics
def labeled_input_topics(self) -> list[str]:
return self.config.labeled_input_topics

@property
def extra_output_topics(self) -> list[str]:
return self.config.extra_output_topics
def labeled_output_topics(self) -> list[str]:
return self.config.labeled_output_topics

@property
def extra_input_patterns(self) -> list[str]:
return self.config.extra_input_patterns
def labeled_input_patterns(self) -> list[str]:
return self.config.labeled_input_patterns

@property
def pipeline(self) -> str | None:
Expand Down
18 changes: 9 additions & 9 deletions backend/streams_explorer/core/k8s_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def parse_config(self, name: str, value: str) -> None:
self.config.output_topic = value
case "ERROR_TOPIC":
self.config.error_topic = value
case "EXTRA_INPUT_TOPICS":
self.config.extra_input_topics = self.parse_extra_topics(value)
case "EXTRA_OUTPUT_TOPICS":
self.config.extra_output_topics = self.parse_extra_topics(value)
case "LABELED_INPUT_TOPICS" | "EXTRA_INPUT_TOPICS":
self.config.labeled_input_topics = self.parse_labeled_topics(value)
case "LABELED_OUTPUT_TOPICS" | "EXTRA_OUTPUT_TOPICS":
self.config.labeled_output_topics = self.parse_labeled_topics(value)
case "INPUT_PATTERN":
self.config.input_pattern = value
case "EXTRA_INPUT_PATTERNS":
self.config.extra_input_patterns = self.parse_extra_topics(value)
case "LABELED_INPUT_PATTERNS" | "EXTRA_INPUT_PATTERNS":
self.config.labeled_input_patterns = self.parse_labeled_topics(value)
case _:
self.config.extra[name] = value

Expand All @@ -70,11 +70,11 @@ def parse_input_topics(input_topics: str) -> list[str]:
return input_topics.split(",")

@staticmethod
def parse_extra_topics(extra_topics: str) -> list[str]:
extra_topics = extra_topics.removesuffix(",") # remove trailing comma
def parse_labeled_topics(labeled_topics: str) -> list[str]:
labeled_topics = labeled_topics.removesuffix(",") # remove trailing comma
return [
topic
for role in extra_topics.split(",")
for role in labeled_topics.split(",")
for topic in role.split("=")[1].split(";")
]

Expand Down
16 changes: 8 additions & 8 deletions backend/streams_explorer/core/services/dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None:
self._add_error_topic(graph, app.id, app.error_topic)
if app.input_pattern:
self._enqueue_input_pattern(app.input_pattern, app.id)
for extra_input in app.extra_input_topics:
self._add_topic(graph, extra_input)
self._add_input_topic(graph, app.id, extra_input)
for extra_output in app.extra_output_topics:
self._add_topic(graph, extra_output)
self._add_output_topic(graph, app.id, extra_output)
for extra_pattern in app.extra_input_patterns:
self._enqueue_input_pattern(extra_pattern, app.id)
for labeled_input in app.labeled_input_topics:
self._add_topic(graph, labeled_input)
self._add_input_topic(graph, app.id, labeled_input)
for labeled_output in app.labeled_output_topics:
self._add_topic(graph, labeled_output)
self._add_output_topic(graph, app.id, labeled_output)
for labeled_pattern in app.labeled_input_patterns:
self._enqueue_input_pattern(labeled_pattern, app.id)

def add_connector(
self, connector: KafkaConnector, pipeline: str | None = None
Expand Down
6 changes: 3 additions & 3 deletions backend/streams_explorer/models/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class K8sConfig:
output_topic: str | None = None # required for streaming app
error_topic: str | None = None
input_pattern: str | None = None
extra_input_topics: list[str] = field(default_factory=list)
extra_output_topics: list[str] = field(default_factory=list)
extra_input_patterns: list[str] = field(default_factory=list)
labeled_input_topics: list[str] = field(default_factory=list)
labeled_output_topics: list[str] = field(default_factory=list)
labeled_input_patterns: list[str] = field(default_factory=list)
extra: dict[str, str] = field(default_factory=dict)


Expand Down
8 changes: 4 additions & 4 deletions backend/tests/test_dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_no_resolve_input_pattern(self, df: DataFlowGraph):
assert df.graph.has_edge("test-namespace/test-app2", "output-topic2")
assert df.graph.has_edge("test-namespace/test-app2", "fake2-dead-letter-topic")

def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
def test_resolve_labeled_input_patterns(self, df: DataFlowGraph):
df.add_streaming_app(
K8sApp.factory(
get_streaming_app_deployment(
Expand All @@ -215,7 +215,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
input_topics="output-topic",
output_topic="another-topic",
error_topic="fake2-dead-letter-topic",
extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic",
labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic",
)
)
)
Expand All @@ -228,7 +228,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
assert not df.graph.has_edge("another-topic", "test-namespace/test-app2")
assert df.graph.has_edge("fake-dead-letter-topic", "test-namespace/test-app2")

def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph):
def test_no_resolve_labeled_input_patterns(self, df: DataFlowGraph):
settings.graph.resolve.input_pattern_topics.all = False
df.add_streaming_app(
K8sApp.factory(
Expand All @@ -246,7 +246,7 @@ def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph):
input_topics="output-topic",
output_topic="output-topic2",
error_topic="fake2-dead-letter-topic",
extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic",
labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic",
)
)
)
Expand Down
38 changes: 34 additions & 4 deletions backend/tests/test_k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_env_prefix_support(self):
assert k8s_app.output_topic == "output-topic"
assert k8s_app.input_topics == ["input-topic"]

def test_extra_input_topics(self):
def test_streams_bootstrap_v2_extra_input_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
Expand All @@ -147,11 +147,40 @@ def test_extra_input_topics(self):
error_topic="error-topic",
multiple_inputs="0=test1,1=test2;test3,",
env_prefix="TEST_",
streams_bootstrap_version=2,
)
)
assert k8s_app.extra_input_topics == ["test1", "test2", "test3"]
assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"]

def test_extra_output_topics(self):
def test_streams_bootstrap_v3_labeled_input_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
input_topics="input-topic",
output_topic="output-topic",
error_topic="error-topic",
multiple_inputs="0=test1,1=test2;test3,",
env_prefix="TEST_",
streams_bootstrap_version=3,
)
)
assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"]

def test_streams_bootstrap_v2_extra_output_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
input_topics="input-topic",
output_topic="output-topic",
error_topic="error-topic",
multiple_outputs="0=test1,1=test2",
env_prefix="TEST_",
streams_bootstrap_version=2,
)
)
assert k8s_app.labeled_output_topics == ["test1", "test2"]

def test_streams_bootstrap_v3_labeled_output_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
Expand All @@ -160,9 +189,10 @@ def test_extra_output_topics(self):
error_topic="error-topic",
multiple_outputs="0=test1,1=test2",
env_prefix="TEST_",
streams_bootstrap_version=3,
)
)
assert k8s_app.extra_output_topics == ["test1", "test2"]
assert k8s_app.labeled_output_topics == ["test1", "test2"]

def test_attributes(self):
k8s_app = K8sAppDeployment(
Expand Down
Loading

0 comments on commit 11b5a17

Please sign in to comment.