Skip to content

Commit

Permalink
cdc: make mq encoding protocol related more clear (#18783)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Nov 4, 2024
1 parent 12d4367 commit 4543d9b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 37 deletions.
87 changes: 56 additions & 31 deletions ticdc/ticdc-avro-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,35 @@ summary: 了解 TiCDC Avro Protocol 的概念和使用方法。

# TiCDC Avro Protocol

Avro 是由 [Apache Avro™](https://avro.apache.org/) 定义的一种数据交换格式协议,[Confluent Platform](https://docs.confluent.io/platform/current/platform.html) 选择它作为默认的数据交换格式。通过本文,你可以了解 TiCDC 对 Avro 数据格式的实现,包括 TiDB 扩展字段、Avro 数据格式定义,以及和 [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) 的交互。
TiCDC Avro 协议,是对 [Confluent Platform](https://docs.confluent.io/platform/current/platform.html) 定义的 [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) 数据传输格式的第三方实现。Avro 是由 [Apache Avro™](https://avro.apache.org/) 定义的一种数据交换格式。

通过本文,你可以了解 TiCDC Avro 数据协议的实现,以及和 [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) 的交互。

> **警告:**
>
> 从 v7.3.0 起,如果开启了[同步没有有效索引的表](/ticdc/ticdc-manage-changefeed.md#同步没有有效索引的表),创建使用 Avro 协议的 changefeed 时会报错。
## 使用 Avro

当使用 Message Queue (MQ) 作为下游 Sink 时,你可以在 `sink-uri` 中指定使用 Avro。TiCDC 获取 TiDB 的 DML 事件,并将这些事件封装到 Avro Message,然后发送到下游。当 Avro 检测到 schema 变化时,会向 Schema Registry 注册最新的 schema。

使用 Avro 时的配置样例如下所示:

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

`--schema-registry` 的值支持 `https` 协议和 `username:password` 认证,username 和 password 必须经过 URL 编码。例如,`--schema-registry=https://username:[email protected]`

> **注意:**
>
> 使用 Avro 协议时,一个 Kafka Topic 只能包含一张表的数据。你需要在配置文件中配置 [Topic 分发器](/ticdc/ticdc-sink-to-kafka.md#topic-分发器)
```shell
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
```

`--schema-registry` 的值支持 https 协议和 username:password 认证,比如`--schema-registry=https://username:[email protected]`,username 和 password 必须经过 URL 编码。

## TiDB 扩展字段

默认情况下,Avro 只收集在 DML 事件中发生数据变更的行的所有数据信息,不收集数据变更的类型和 TiDB 专有的 CommitTS 事务唯一标识信息。为了解决这个问题,TiCDC 在 Avro 协议格式中附加了 TiDB 扩展字段。当 `sink-uri` 中设置 `enable-tidb-extension``true` (默认为 `false`)后,TiCDC 生成 Avro 消息时会新增三个字段:

- `_tidb_op`:DML 的类型,"c" 表示插入,"u" 表示更新。
- `_tidb_commit_ts`:事务唯一标识信息。
- `_tidb_commit_physical_time`:事务标识信息中现实时间的时间戳。

配置样例如下所示:

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

```shell
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
```

## 数据格式定义

TiCDC 会将一个 DML 事件转换为一个 kafka 事件,其中事件的 key 和 value 都按照 Avro 协议进行编码。
Expand Down Expand Up @@ -89,7 +72,36 @@ Key 中的 `fields` 只包含主键或唯一索引列。
}
```

Value 数据格式默认与 Key 数据格式相同,但是 Value 的 `fields` 中包含了所有的列,而不仅仅是主键列。
Value 数据格式默认与 Key 数据格式相同,但是 Value 的 `fields` 中包含了所有的列。

> **注意:**
>
> Avro 协议在编码 DML 事件时,操作方式如下:
>
> - 对于 Delete 事件,只编码 Key 部分,Value 部分为空。
> - 对于 Insert 事件,编码所有列数据到 Value 部分。
> - 对于 Update 事件,只编码更新后的所有列数据到 Value 部分。
## TiDB 扩展字段

默认情况下,Avro 只编码在 DML 事件中发生数据变更的行的所有列数据信息,不收集数据变更的类型和 TiDB 专有的 CommitTS 事务唯一标识信息。为了解决这个问题,TiCDC 在 Avro 协议格式中附加了 TiDB 扩展字段。当 `sink-uri` 中设置 `enable-tidb-extension``true` (默认为 `false`)后,TiCDC 生成 Avro 消息时,会在 Value 部分新增三个字段:

- `_tidb_op`:DML 的类型,"c" 表示插入,"u" 表示更新。
- `_tidb_commit_ts`:事务唯一标识信息。
- `_tidb_commit_physical_time`:事务标识信息中现实时间的时间戳。

配置样例如下所示:

```shell
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
```

```shell
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
```

如果开启了 [TiDB 扩展字段](#tidb-扩展字段),那么 Value 数据格式将会变成:

Expand Down Expand Up @@ -155,7 +167,7 @@ Column 数据格式即 Key/Value 数据格式中的 `{{ColumnValueBlock}}` 部

- `{{ColumnName}}` 表示列名。
- `{{TIDB_TYPE}}` 表示对应到 TiDB 中的类型,与原始的 SQL Type 不是一一对应关系。
- `{{AVRO_TYPE}}` 表示 [avro spec](https://avro.apache.org/docs/current/spec.html) 中的类型。
- `{{AVRO_TYPE}}` 表示 [Avro Specification](https://avro.apache.org/docs/++version++/specification) 中的类型。

| SQL TYPE | TIDB_TYPE | AVRO_TYPE | 说明 |
|------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|
Expand Down Expand Up @@ -266,14 +278,27 @@ DECIMAL(10, 4)

## DDL 事件与 Schema 变更

Avro 并不会向下游生成 DDL 事件。Avro 会在每次 DML 事件发生时检测是否发生 schema 变更,如果发生了 schema 变更,Avro 会生成新的 schema,并尝试向 Schema Registry 注册。注册时,Schema Registry 会做兼容性检测,如果此次 schema 变更没有通过兼容性检测,注册将会失败,TiCDC 并不会尝试解决 schema 的兼容性问题。
Avro 协议并不会向下游发送 DDL 事件和 Watermark 事件。Avro 会在每次 DML 事件发生时检测是否发生 schema 变更,如果发生了 schema 变更,Avro 会生成新的 schema,并尝试向 Schema Registry 注册。注册时,Schema Registry 会做兼容性检测,如果此次 schema 变更没有通过兼容性检测,注册将会失败,TiCDC 并不会尝试解决 schema 的兼容性问题。

同时,即使 schema 变更通过兼容性检测并成功注册新版本,数据的生产者和消费者可能仍然需要升级才能正确工作
比如,Confluent Schema Registry 默认的[兼容性策略](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types)`BACKWARD`,在这种策略下,如果你在源表增加一个非空列,Avro 在生成新 schema 向 Schema Registry 注册时将会因为兼容性问题失败,这个时候 changefeed 将会进入 error 状态

比如,Confluent Schema Registry 默认的兼容性策略是 BACKWARD,在这种策略下,如果你在源表增加一个非空列,Avro 在生成新 schema 向 Schema Registry 注册时将会因为兼容性问题失败,这个时候 changefeed 将会进入 error 状态
同时,即使 schema 变更通过兼容性检测并成功注册新版本,数据的生产者和消费者可能仍然需要获取到新版本的 schema,才能对数据进行正确编解码

如需了解更多 schema 相关信息,请参阅 [Schema Registry 的相关文档](https://docs.confluent.io/platform/current/schema-registry/avro.html)

## 消费者实现

TiCDC Avro 协议支持被 [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer) 反序列化。

消费者程序可以通过 [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html) 获取到最新的 schema,然后对数据进行反序列化。

### 区分事件类型

消费者程序可以按照如下规则区分 DML 事件类型:

* 只有 Key 部分,则是 Delete 事件。
* 含有 Value 部分,则是 Insert 或 Update 事件。如果用户开启了 [TiDB 扩展字段功能](#tidb-扩展字段),可以根据其中的 `_tidb_op` 字段,判断该条事件变更是 Insert 或 Update。如果未开启 TiDB 扩展字段功能,则无法区分。

## Topic 分发

Schema Registry 支持三种 [Subject Name Strategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy):TopicNameStrategy、RecordNameStrategy 和 TopicRecordNameStrategy。目前 TiCDC Avro 只支持 TopicNameStrategy 一种,这意味着一个 kafka topic 只能接收一种数据格式的数据,所以 TiCDC Avro 禁止将多张表映射到同一个 topic。在创建 changefeed 时,如果配置的分发规则中,topic 规则不包含 `{schema}``{table}` 占位符,将会报错。
Expand Down
13 changes: 7 additions & 6 deletions ticdc/ticdc-sink-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ summary: 了解如何使用 TiCDC 将数据同步到 Kafka。

本文介绍如何使用 TiCDC 创建一个将增量数据复制到 Kafka 的 Changefeed。

## 创建同步任务,复制增量数据 Kafka
## 创建同步任务,复制增量数据到 Kafka

使用以下命令来创建同步任务:

Expand Down Expand Up @@ -73,14 +73,14 @@ URI 中可配置的的参数如下:
| `127.0.0.1` | 下游 Kafka 对外提供服务的 IP。 |
| `9092` | 下游 Kafka 的连接端口。 |
| `topic-name` | 变量,使用的 Kafka topic 名字。 |
| `protocol` | 输出到 Kafka 的消息协议。可选值有 [`canal-json`](/ticdc/ticdc-canal-json.md)[`open-protocol`](/ticdc/ticdc-open-protocol.md)[`avro`](/ticdc/ticdc-avro-protocol.md)[`debezium`](/ticdc/ticdc-debezium.md)[`simple`](/ticdc/ticdc-simple-protocol.md)|
| `kafka-version` | 下游 Kafka 版本号。该值需要与下游 Kafka 的实际版本保持一致。 |
| `kafka-client-id` | 指定同步任务的 Kafka 客户端的 ID(可选,默认值为 `TiCDC_sarama_producer_同步任务的 ID`)。 |
| `partition-num` | 下游 Kafka partition 数量(可选,不能大于实际 partition 数量,否则创建同步任务会失败,默认值 `3`)。|
| `max-message-bytes` | 每次向 Kafka broker 发送消息的最大数据量(可选,默认值 `10MB`)。从 v5.0.6 和 v4.0.6 开始,默认值分别从 64MB 和 256MB 调整至 10 MB。|
| `replication-factor` | Kafka 消息保存副本数(可选,默认值 `1`),需要大于等于 Kafka 中 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 的值。 |
| `required-acks` |`Produce` 请求中使用的配置项,用于告知 broker 需要收到多少副本确认后才进行响应。可选值有:`0``NoResponse`:不发送任何响应,只有 TCP ACK),`1``WaitForLocal`:仅等待本地提交成功后再响应)和 `-1``WaitForAll`:等待所有同步副本提交后再响应。最小同步副本数量可通过 broker 的 [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) 配置项进行配置)。(可选,默认值为 `-1`)。 |
| `compression` | 设置发送消息时使用的压缩算法(可选值为 `none``lz4``gzip``snappy``zstd`,默认值为 `none`)。注意 Snappy 压缩文件必须遵循[官方 Snappy 格式](https://github.com/google/snappy)。不支持其他非官方压缩格式。|
| `protocol` | 输出到 Kafka 的消息协议,可选值有 `canal-json``open-protocol``avro``debezium``simple`|
| `auto-create-topic` | 当传入的 `topic-name` 在 Kafka 集群不存在时,TiCDC 是否要自动创建该 topic(可选,默认值 `true`)。 |
| `enable-tidb-extension` | 可选,默认值是 `false`。当输出协议为 `canal-json` 时,如果该值为 `true`,TiCDC 会发送 [WATERMARK 事件](/ticdc/ticdc-canal-json.md#watermark-event),并在 Kafka 消息中添加 TiDB 扩展字段。从 6.1.0 开始,该参数也可以和输出协议 `avro` 一起使用。如果该值为 `true`,TiCDC 会在 Kafka 消息中添加[三个 TiDB 扩展字段](/ticdc/ticdc-avro-protocol.md#tidb-扩展字段)|
| `max-batch-size` | 从 v4.0.9 开始引入。当消息协议支持把多条变更记录输出至一条 Kafka 消息时,该参数用于指定这一条 Kafka 消息中变更记录的最多数量。目前,仅当 Kafka 消息的 `protocol``open-protocol` 时有效(可选,默认值 `16`)。|
Expand Down Expand Up @@ -109,12 +109,13 @@ URI 中可配置的的参数如下:
### 最佳实践

* TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应 `max-message-bytes``partition-num` 参数。
* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num``replication-factor` 参数自行创建 Topic建议明确指定这两个参数。
* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num``replication-factor` 参数自行创建 Topic建议明确指定这两个参数。
* 在大多数情况下,建议使用 `canal-json` 协议。

> **注意:**
>
> `protocol``open-protocol` 时,TiCDC 会尽量避免产生长度超过 `max-message-bytes` 的消息。但如果单条数据变更记录需要超过 `max-message-bytes` 个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。
> `protocol``open-protocol` 时,TiCDC 会将多个事件编码到同一个 Kafka 消息中,并尽量避免在此过程中生成长度超过 `max-message-bytes` 的消息。
> 如果单条数据变更编码得到的消息大小超过了 `max-message-bytes` 个字节,changefeed 会报错,并打印错误日志。
### TiCDC 使用 Kafka 的认证与授权

Expand Down Expand Up @@ -172,7 +173,7 @@ URI 中可配置的的参数如下:

### TiCDC 集成 Kafka Connect (Confluent Platform)

如要使用 Confluent 提供的 [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) 向关系型或非关系型数据库传输数据,请选择 `avro` 协议,并在 `schema-registry` 中提供 [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) 的 URL。
如要使用 Confluent 提供的 [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) 向关系型或非关系型数据库传输数据,请选择 [Avro 协议](/ticdc/ticdc-avro-protocol.md)协议,并在 `schema-registry` 中提供 [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) 的 URL。

配置样例如下所示:

Expand All @@ -191,7 +192,7 @@ dispatchers = [

### TiCDC 集成 AWS Glue Schema Registry

从 v7.4.0 开始,TiCDC 支持在用户选择 Avro 协议同步数据时使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) 作为 Schema Registry。配置样例如下所示:
从 v7.4.0 开始,TiCDC 支持在用户选择 [Avro 协议](/ticdc/ticdc-avro-protocol.md)同步数据时使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) 作为 Schema Registry。配置样例如下所示:

```shell
./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml
Expand Down

0 comments on commit 4543d9b

Please sign in to comment.