From 99054b3d43faded89b82e1472e73a7978c863311 Mon Sep 17 00:00:00 2001 From: jialin li Date: Wed, 25 Dec 2024 11:08:06 +0800 Subject: [PATCH] Update data-bridge-kafka.md add kafka-source SQL select paramters, and modify the Topic in message re-publish --- zh_CN/data-integration/data-bridge-kafka.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/zh_CN/data-integration/data-bridge-kafka.md b/zh_CN/data-integration/data-bridge-kafka.md index 6f5bf3788..6cc5536ec 100644 --- a/zh_CN/data-integration/data-bridge-kafka.md +++ b/zh_CN/data-integration/data-bridge-kafka.md @@ -300,7 +300,7 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的 4. 如果您想将从 Kafka Source `$bridges/kafka_consumer:` 转换的消息转发到 EMQX,请在 **SQL 编辑器**中输入以下语句。 - 注意:如果您想指定自己的 SQL 语法,请确保 `SELECT` 部分包含了稍后步骤中设置的消息重发布动作所需的所有字段。 + 注意:如果您想指定自己的 SQL 语法,请确保 `SELECT` 部分包含了稍后步骤中设置的消息重发布动作所需的所有字段。Kafka Source 的 Select 语句中可以使用 `ts_type`、`topic`、`ts`、`event`、`headers`、`key`、`metadata`、`value`、`timestamp`、`offset`、`node` 等字段。 ```sql SELECT @@ -321,7 +321,7 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的 - **Kafka 主题名称**:指定消费者 Source 将订阅的 Kafka 主题,以接收消息。 - **消费组 ID**:指定此 Source 的消费组标识符。如果未提供,系统将基于 Source 名称自动生成一个组 ID。 - **Key 编码模式** 和 **编码模式**:选择 Kafka 消息键和消息值的编码模式。 -2. **偏移重置策略**:选择当没有消费者偏移量或偏移量变得无效时,Kafka 消费者开始从 Kafka 主题分区读取的偏移量重置策略。 +6. **偏移重置策略**:选择当没有消费者偏移量或偏移量变得无效时,Kafka 消费者开始从 Kafka 主题分区读取的偏移量重置策略。 - 如果您希望消费者从最新偏移量开始读取消息,跳过消费者启动前产生的消息,请选择 `latest`。 - 如果您希望消费者从分区的开始读取消息,包括消费者启动前产生的消息,即读取主题中的所有历史数据,请选择 `earliest`。 @@ -332,10 +332,12 @@ EMQX v5.7.2 引入了一项新功能,可以在 SQL 处理阶段将从设置的 ### 添加一个消息重发布动作 1. 选择**动作输出**选项卡并点击 + **添加动作**按钮来定义规则触发的操作。 -1. 从**动作类型**下拉列表中选择**消息重发布**。 -2. 在 **主题** 和 **Payload** 字段中,您可以输入您想重新发布的消息的主题和 payload。例如,对于此演示,输入 `t/1` 和 `${.}`。 -3. 点击**添加**将此动作包含在规则中。 -4. 回到**创建规则**页面,点击页面最下方的**保存**以完成规则创建。 +2. 从**动作类型**下拉列表中选择**消息重发布**。 +3. 在 **主题** 和 **Payload** 字段中,您可以输入您想重新发布的消息的主题和 payload。例如,对于此演示,输入 `t/1` 和 `${.}`。 + + - **主题** 中 也可以使用 `${}` 来动态指定 mqtt topic 主题,例如 `t/${key}` (注意:**${}** 中传入的参数,需要包含在 SQL `Select` 语句中) +4. 点击**添加**将此动作包含在规则中。 +5. 回到**创建规则**页面,点击页面最下方的**保存**以完成规则创建。 ![Kafka_consumer_rule](./assets/Kafka_consumer_rule.png)