Skip to content

Commit

Permalink
cdc: add ticdc integrity check golang implementation explanation. (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Jun 21, 2023
1 parent 96852d5 commit faf76e7
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions ticdc/ticdc-integrity-check.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,31 @@ fn checksum(columns) {
* BIT、ENUM 和 SET 类型会被转换为 UINT64 类型。
* BIT 类型按照二进制转换为 UINT64 类型。
* ENUM 和 SET 类型按照其对应的 INT 值转换为 UINT64 类型。例如,`SET('a','b','c')` 类型 column 的数据值为 `'a,c'`,则该值将被编码为 `0b101`。
* ENUM 和 SET 类型按照其对应的 INT 值转换为 UINT64 类型。例如,`SET('a','b','c')` 类型 column 的数据值为 `'a,c'`,则该值将被编码为 `0b101`,即 `5`
* TIMESTAMP、DATE、DURATION、DATETIME、JSON 和 DECIMAL 类型会被转换为 STRING 类型,然后转换为 UTF8 编码的字节。
* VARBIANRY、BINARY 和 BLOB(包括 TINY、MEDIUM 和 LONG)类型会直接使用它的字节。
* VARCHAR、CHAR 和 TEXT(包括 TINY、MEDIUM 和 LONG)类型会被编码为 UTF8 编码的字节。
* TIMESTAMP、DATE、DURATION、DATETIME、JSON 和 DECIMAL 类型会被转换为 STRING 类型,然后转换为字节。
* CHAR、VARCHAR、VARSTRING、STRING、TEXT、BLOB(包括 TINY、MEDIUM 和 LONG)等字符类型,会直接使用字节。
* NULL 和 GEOMETRY 类型不会被纳入到 Checksum 计算中,返回空字节。
## 基于 Golang 的 Avro 数据消费和 Checksum 计算过程解释
TiCDC 提供了基于 Golang 的 Checksum 计算过程,你可以参考该过程实现自己的 Checksum 计算逻辑。主要代码逻辑位于 TiCDC Avro Decoder 中实现的 [NextRowChangedEvent](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L100) 方法。该方法的具体工作过程如下:
1. 假设已经从 Kafka 读取到消息,并设置了 key 和 value 字段。分别对 key 和 value 进行解码操作,得到解码之后的数据值和 schema。具体的过程可以参考 [`decodeKey`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L395) 和 [`decodeValue`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L419) 方法。
2. 使用解码后的 key、value 和 schema 等内容,重新构建每一列的数据内容 `RowChangedEvent`。详情可见 [`assembleEvent`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L176) 方法。构建 `RowChangedEvent` 的过程如下:
1. 获取 schema 中所有的 `fields` 内容,遍历 `fields` 中的每一个元素 `field` 以构建对应的列。其中 `fields` 已经按照 Column ID 排序。
2. 利用 `field` 中包含的每一列的类型信息,重建每一列的 MySQL Type。并通过 keyMap 识别到 Handle Key 列,然后设置相应的 flag。
3. valueMap 中的值需要经过 [`getColumnValue`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L299) 转换,这是因为在编码过程中,某些列允许为 NULL,此时会把 value 编码成一个 map。因此,解码时需要从 map 中获取具体的值,即 map 中的第一个元素。如果该列是 `mysql.TypeEnum` 或 `mysql.TypeSet` 类型,还需要映射到它们的数字形式表示上。
4. 遍历完 `fields` 后,就获取了所有列的数据。对于 Delete 事件,将解码得到的列设置为 `PreColumns`;对于 Insert 和 Update 事件,将解码得到的列设置为 `Columns`。
Checksum 计算和校验的过程如下:
1. 调用 [`extractExpectedChecksum`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L281) 方法,获取期望的 Checksum 值。如果该方法返回 `false`,则说明该事件不需要进行 Checksum 校验,因为上游并没有发送 Checksum。这可能发生在 TiCDC 开启了 Checksum 但 TiDB 没有开启该功能时,或者当前事件发生在 Checksum 校验功能开启之前等场景。
2. 调用 [`calculateChecksum`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L461) 方法,遍历之前重建出来的所有列。利用 [buildChecksumBytes](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L482) 方法将每一列的 value 和 MySQL Type 编码为一个字节切片,然后使用该字节切片更新 Checksum 值。
3. 通过 [`verifyChecksum`](https://github.com/pingcap/tiflow/blob/eb04aecaf8e61f7f9d67597c2d2ef1f44583dd79/pkg/sink/codec/avro/decoder.go#L444) 方法,进行 Checksum 计算和校验。将步骤 2 计算的值与步骤 1 获取的期望值进行比较。如果不相等,则说明 Checksum 校验失败,数据可能存在损坏的情况。
> **注意:**
>
> 开启 Checksum 校验功能后,DECIMAL 和 UNSIGNED BIGINT 类型的数据会被转换为字符串类型。因此在下游消费者代码中需要将其转换为对应的数值类型,然后进行 Checksum 相关计算。
Golang 消费者代码实现了解码从 Kafka 读取到的数据、按照 schema fields 排序以及 Checksum 计算等步骤。详情请参考 [`avro/decoder.go`](https://github.com/pingcap/tiflow/blob/master/pkg/sink/codec/avro/decoder.go)。
> - 开启 Checksum 校验功能后,DECIMAL 和 UNSIGNED BIGINT 类型的数据会被转换为字符串类型。因此在下游消费者代码中需要将其转换为对应的数值类型,然后进行 Checksum 相关计算。
> - Delete 事件只含有 Handle Key 列的内容,而 Checksum 是基于所有列计算的,所以 Delete 事件不参与到 Checksum 的校验中。

0 comments on commit faf76e7

Please sign in to comment.