diff --git a/Makefile b/Makefile index e8fd7dc81..5c6afd083 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ ### Makefile for tidb-binlog -.PHONY: build test check update clean pump drainer fmt reparo integration_test arbiter binlogctl +.PHONY: build test check update clean pump drainer fmt reparo integration_test binlogctl PROJECT=tidb-binlog @@ -37,7 +37,7 @@ all: dev install dev: check test -build: pump drainer reparo arbiter binlogctl +build: pump drainer reparo binlogctl pump: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/pump cmd/pump/main.go @@ -45,9 +45,6 @@ pump: drainer: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/drainer cmd/drainer/main.go -arbiter: - $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/arbiter cmd/arbiter/main.go - reparo: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/reparo cmd/reparo/main.go diff --git a/arbiter/README.md b/arbiter/README.md deleted file mode 100644 index 156894cfb..000000000 --- a/arbiter/README.md +++ /dev/null @@ -1,67 +0,0 @@ -Arbiter -========== - -**Arbiter** is a tool used for syncing data from Kafka to TiDB incrementally. - -![](./arbiter.png) - -The complete import process is as follows: - -1. Read Binlog from Kafka in the format of [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/proto/proto/binlog.proto). -2. While reaching a limit data size, construct the SQL according the Binlog and write to downstream concurrently(notice: Arbiter will split the upstream transaction). -3. Save the checkpoint. - - -## Checkpoint -`arbiter` will write a record to the table `tidb_binlog.arbiter_checkpoint` at downstream TiDB. -``` -mysql> select * from tidb_binlog.arbiter_checkpoint; -+-------------+--------------------+--------+ -| topic_name | ts | status | -+-------------+--------------------+--------+ -| test_kafka4 | 405809779094585347 | 1 | -+-------------+--------------------+--------+ -``` -- topic_name: the topic name of Kafka to consume. -- ts: the timestamp checkpoint -- status: - * 0 - All Binlog data <= ts has synced to downstream. - * 1 - means `Arbiter` is running or quit unexpectedly, Binlog with timestamp bigger than ts may partially synced to downstream. - - - -## Monitor - -Arbiter supports metrics collection via [Prometheus](https://prometheus.io/). - -###Metrics - -* **`binlog_arbiter_checkpoint_tso`** (Gauge) - - Corresponding to ts in table `tidb_binlog.arbiter_checkpoint` - -* **`binlog_arbiter_query_duration_time`** (Histogram) - - Bucketed histogram of the time needed to wirte to downstream. Labels: - - * **type**: `exec` `commit` time takes to execute and commit SQL. - -* **`binlog_arbiter_event`** (Counter) - - Event times counter. Labels: - - * **type**: e.g. `DDL` `Insert` `Update` `Delete` `Txn` - -* **`binlog_arbiter_queue_size`** (Gauge) - - Queue size. Labels: - - * **name**: e.g. `kafka_reader` `loader_input` - -* **`binlog_arbiter_txn_latency_seconds`** (Histogram) - - Bucketed histogram of the time duration between the time write to downstream and commit time of upstream transaction(phsical part of commitTS). - - diff --git a/arbiter/README_CN.md b/arbiter/README_CN.md deleted file mode 100644 index 7003e8b45..000000000 --- a/arbiter/README_CN.md +++ /dev/null @@ -1,150 +0,0 @@ -Arbiter -========== - -**Arbiter** 是一个从 Kafka 获取 Binlog 增量同步数据到 TiDB 的工具. - -![](./arbiter.png) - -整体工作原理如下: - -1. 读取 Kafka 的 [Protobuf](https://github.com/pingcap/tidb-tools/blob/master/tidb-binlog/proto/proto/binlog.proto) 格式 Binlog 。 -2. 达到一定数据量后 根据 Binlog 构造对应 SQL 并发写入下游(注意 Arbiter 会拆分上游事务)。 -3. 保存 checkpoint 。 - - -## Checkpoint -`arbiter` 会在下游 TiDB `tidb_binlog.arbiter_checkpoint` 表里保存一条 checkpoint 记录。 -``` -mysql> select * from tidb_binlog.arbiter_checkpoint; -+-------------+--------------------+--------+ -| topic_name | ts | status | -+-------------+--------------------+--------+ -| test_kafka4 | 405809779094585347 | 1 | -+-------------+--------------------+--------+ -``` -- topic_name: 消费的 Kafka 主题名。 -- ts: 当前同步到了哪个 ts -- status: - * 0 - 表示 <= ts 的数据都同步到下游了。 - * 1 - 运行中或者异常退出,> ts 后的部分 Binlog 可能同步到下游。 - - - -## 监控告警 - -Arbiter 支持给 [Prometheus](https://prometheus.io/) 采集度量 (metrics)。本节介绍 Arbiter 的监控配置与监控指标。 - -### 监控配置 - -只要 Prometheus 能发现 **Arbiter** 的监控地址,就能收集监控指标。 - -监控的端口可在 arbiter.toml 中配置: - -```toml -# addr (i.e. 'host:port') to listen on for Arbiter connections -addr = "0.0.0.0:8251" -``` - -要让 Prometheus 发现 Arbiter,可以将地址直接写入其配置文件,例如: -```yml -scrape_configs: - - job_name: 'arbiter' - honor_labels: true # don't overwrite job & instance labels - static_configs: - - targets: ['192.168.20.10:8251'] -``` - -#### 导入 Grafana 面板 - -执行以下步骤,为 Arbiter 导入 Grafana 面板: - -1. 点击侧边栏的 Grafana 图标。 - -2. 在侧边栏菜单中,依次点击 **Dashboards** > **Import** 打开 **Import Dashboard** 窗口。 - -3. 点击 **Upload .json File** 上传对应的 JSON 文件(下载 [Arbiter 配置文件](./arbiter.json))。 - -4. 点击 **Load**。 - -5. 选择一个 Prometheus 数据源。 - -6. 点击 **Import**,Prometheus 面板即导入成功。 - -### 监控指标 - -本节将详细介绍 **arbiter** 的监控指标. - -* **`binlog_arbiter_checkpoint_tso`** (测量仪) - - 对应 `tidb_binlog.arbiter_checkpoint` 表里的 ts - -* **`binlog_arbiter_query_duration_time`** (直方图) - - 写下游需时的直方图。标签: - - * **type**: `exec` `commit` 执行 SQL 跟提交时的耗时。 - -* **`binlog_arbiter_event`** (计数器) - - 计算事件次数 - - * **type**: `DDL` `Insert` `Update` `Delete` `Txn` - -* **`binlog_arbiter_queue_size`** (测量仪) - - 内部队列数据囤积大小。标签: - - * **name**: `kafka_reader` `loader_input` - -* **`binlog_arbiter_txn_latency_seconds`** (直方图) - - 上游事务提交(commitTS物理时间) 到对应事务写入下游的花时。 - -### 报警配置 - -执行以下步骤,为 Arbiter 导入 Prometheus 告警规则: - -1. 下载 Arbiter 报警规则文件 [Arbiter 配置文件](./arbiter.rules.yml) 放到 Prometheus 的配置文件目录下。 - -2. 修改 Prometheus 配置文件 `prometheus.yml`,在 rule_files 添加对应文件: - -```yml -# Load and evaluate rules in this file every 'evaluation_interval' seconds. -rule_files: - - 'arbiter.rules.yml' -``` - -### 报警规则 - -本节介绍 Arbiter 的告警规则。 - -#### `binlog_arbiter_checkpoint_high_delay` - -* 报警规则: - - `(time() - binlog_arbiter_checkpoint_tso / 1000) > 3600` - -* 规则描述: - - Arbiter 同步落后超过一个小时则报警。 - -* 处理方法: - - 同步慢或者同步历史数据造成的,需要查看监控定位问题。 - -#### `binlog_arbiter_checkpoint_tso_no_change_for_1m` - -* 报警规则: - - `changes(binlog_arbiter_checkpoint_tso[1m]) < 1` - -* 规则描述: - - Arbiter checkpoint 一分钟没更新则报警。 - -* 处理方法: - - 没有数据源或者同步异常阻塞造成的, 需要查看监控定位问题。 - diff --git a/arbiter/arbiter.json b/arbiter/arbiter.json deleted file mode 100644 index 619147667..000000000 --- a/arbiter/arbiter.json +++ /dev/null @@ -1,567 +0,0 @@ -{ - "__inputs": [ - { - "name": "DS_TIDB-CLUSTER", - "label": "tidb-cluster", - "description": "", - "type": "datasource", - "pluginId": "prometheus", - "pluginName": "Prometheus" - } - ], - "__requires": [ - { - "type": "grafana", - "id": "grafana", - "name": "Grafana", - "version": "4.6.3" - }, - { - "type": "panel", - "id": "graph", - "name": "Graph", - "version": "" - }, - { - "type": "datasource", - "id": "prometheus", - "name": "Prometheus", - "version": "1.0.0" - }, - { - "type": "panel", - "id": "singlestat", - "name": "Singlestat", - "version": "" - } - ], - "annotations": { - "list": [ - { - "builtIn": 1, - "datasource": "-- Grafana --", - "enable": true, - "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", - "name": "Annotations & Alerts", - "type": "dashboard" - } - ] - }, - "editable": true, - "gnetId": null, - "graphTooltip": 0, - "hideControls": false, - "id": null, - "links": [], - "refresh": "5s", - "rows": [ - { - "collapse": false, - "height": "250px", - "panels": [ - { - "cacheTimeout": null, - "colorBackground": false, - "colorValue": false, - "colors": [ - "#299c46", - "rgba(237, 129, 40, 0.89)", - "#d44a3a" - ], - "datasource": "${DS_TIDB-CLUSTER}", - "decimals": null, - "format": "dateTimeAsIso", - "gauge": { - "maxValue": 100, - "minValue": 0, - "show": false, - "thresholdLabels": false, - "thresholdMarkers": true - }, - "id": 1, - "interval": null, - "links": [], - "mappingType": 1, - "mappingTypes": [ - { - "name": "value to text", - "value": 1 - }, - { - "name": "range to text", - "value": 2 - } - ], - "maxDataPoints": 100, - "nullPointMode": "connected", - "nullText": null, - "postfix": "", - "postfixFontSize": "50%", - "prefix": "", - "prefixFontSize": "50%", - "rangeMaps": [ - { - "from": "null", - "text": "N/A", - "to": "null" - } - ], - "span": 4, - "sparkline": { - "fillColor": "rgba(31, 118, 189, 0.18)", - "full": false, - "lineColor": "rgb(31, 120, 193)", - "show": false - }, - "tableColumn": "", - "targets": [ - { - "expr": "binlog_arbiter_checkpoint_tso{instance = \"$arbiter_instance\"}", - "format": "time_series", - "instant": true, - "intervalFactor": 2, - "legendFormat": "{{instance}}", - "refId": "A" - } - ], - "thresholds": "", - "title": "Checkpoint TS", - "type": "singlestat", - "valueFontSize": "80%", - "valueMaps": [ - { - "op": "=", - "text": "N/A", - "value": "null" - } - ], - "valueName": "avg" - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TIDB-CLUSTER}", - "description": "Bucketed histogram of seconds of a txn between loaded to downstream and committed at upstream", - "fill": 1, - "id": 5, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "span": 8, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "histogram_quantile(0.99, rate(binlog_arbiter_txn_latency_seconds_bucket{instance = \"$arbiter_instance\"}[1m]))", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "{{instance}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "99% Txn Latency", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "repeat": null, - "repeatIteration": null, - "repeatRowId": null, - "showTitle": false, - "title": "Dashboard Row", - "titleSize": "h6" - }, - { - "collapse": false, - "height": 250, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TIDB-CLUSTER}", - "fill": 1, - "id": 2, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "span": 12, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "irate(binlog_arbiter_event{instance = \"$arbiter_instance\"}[1m])", - "format": "time_series", - "hide": false, - "intervalFactor": 2, - "legendFormat": "{{type}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Events", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "repeat": null, - "repeatIteration": null, - "repeatRowId": null, - "showTitle": false, - "title": "Dashboard Row", - "titleSize": "h6" - }, - { - "collapse": false, - "height": 250, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TIDB-CLUSTER}", - "fill": 1, - "id": 3, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "span": 12, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "histogram_quantile(0.99, rate(binlog_arbiter_query_duration_time_bucket{instance = \"$arbiter_instance\"}[1m]))", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "99%: {{type}}", - "refId": "A" - }, - { - "expr": "histogram_quantile(0.95, rate(binlog_arbiter_query_duration_time_bucket{instance = \"$arbiter_instance\"}[1m]))", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "95%: {{type}} ", - "refId": "B" - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Query Time", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "decimals": null, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "repeat": null, - "repeatIteration": null, - "repeatRowId": null, - "showTitle": false, - "title": "Dashboard Row", - "titleSize": "h6" - }, - { - "collapse": false, - "height": 250, - "panels": [ - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TIDB-CLUSTER}", - "fill": 1, - "id": 4, - "legend": { - "alignAsTable": true, - "avg": false, - "current": true, - "hideEmpty": false, - "hideZero": false, - "max": false, - "min": false, - "rightSide": true, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pointradius": 5, - "points": false, - "renderer": "flot", - "repeat": null, - "seriesOverrides": [], - "spaceLength": 10, - "span": 12, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "binlog_arbiter_queue_size{instance = \"$arbiter_instance\"}", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "{{name}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeShift": null, - "title": "Queue Size", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "transparent": false, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ] - } - ], - "repeat": null, - "repeatIteration": null, - "repeatRowId": null, - "showTitle": false, - "title": "Dashboard Row", - "titleSize": "h6" - } - ], - "schemaVersion": 14, - "style": "dark", - "tags": [], - "templating": { - "list": [ - { - "allValue": null, - "current": {}, - "datasource": "${DS_TIDB-CLUSTER}", - "hide": 0, - "includeAll": false, - "label": null, - "multi": false, - "name": "arbiter_instance", - "options": [], - "query": "label_values(binlog_arbiter_checkpoint_tso,instance)", - "refresh": 2, - "regex": "", - "sort": 0, - "tagValuesQuery": "", - "tags": [], - "tagsQuery": "", - "type": "query", - "useTags": false - } - ] - }, - "time": { - "from": "now-15m", - "to": "now" - }, - "timepicker": { - "refresh_intervals": [ - "5s", - "10s", - "30s", - "1m", - "5m", - "15m", - "30m", - "1h", - "2h", - "1d" - ], - "time_options": [ - "5m", - "15m", - "1h", - "6h", - "12h", - "24h", - "2d", - "7d", - "30d" - ] - }, - "timezone": "", - "title": "arbiter", - "version": 10 -} \ No newline at end of file diff --git a/arbiter/arbiter.png b/arbiter/arbiter.png deleted file mode 100644 index 94ea91718..000000000 Binary files a/arbiter/arbiter.png and /dev/null differ diff --git a/arbiter/arbiter.rules.yml b/arbiter/arbiter.rules.yml deleted file mode 100644 index 8ce1c44ca..000000000 --- a/arbiter/arbiter.rules.yml +++ /dev/null @@ -1,25 +0,0 @@ -groups: -- name: alert.rules - rules: - - alert: binlog_arbiter_checkpoint_high_delay - expr: (time() - binlog_arbiter_checkpoint_tso / 1000) > 3600 - for: 1m - labels: - env: test-cluster - level: warning - expr: (time() - binlog_arbiter_checkpoint_tso / 1000) > 3600 - annotations: - description: 'cluster: test-cluster, instance: {{ $labels.instance }}, values: {{ $value }}' - value: '{{ $value }}' - summary: arbiter arbiter checkpoint delay more than 1 hour - - - alert: binlog_arbiter_checkpoint_tso_no_change_for_1m - expr: changes(binlog_arbiter_checkpoint_tso[1m]) < 1 - labels: - env: test-cluster - level: warning - expr: changes(binlog_arbiter_checkpoint_tso[1m]) < 1 - annotations: - description: 'cluster: test-cluster, instance: {{ $labels.instance }}, values: {{ $value }}' - value: '{{ $value }}' - summary: binlog arbiter checkpoint tso no change for 1m diff --git a/arbiter/checkpoint.go b/arbiter/checkpoint.go deleted file mode 100644 index a3378affe..000000000 --- a/arbiter/checkpoint.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - gosql "database/sql" - "fmt" - - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" - - "github.com/pingcap/errors" -) - -const ( - // StatusNormal means server quit normally, data <= ts is synced to downstream - StatusNormal int = 0 - // StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream - StatusRunning int = 1 -) - -// Checkpoint is able to save and load checkpoints -type Checkpoint interface { - Save(ts int64, status int) error - Load() (ts int64, status int, err error) -} - -type dbCheckpoint struct { - database string - table string - db *gosql.DB - topicName string -} - -// NewCheckpoint creates a Checkpoint -func NewCheckpoint(db *gosql.DB, topicName string) (Checkpoint, error) { - cp := &dbCheckpoint{ - db: db, - database: "tidb_binlog", - table: "arbiter_checkpoint", - topicName: topicName, - } - - if err := cp.createSchemaIfNeed(); err != nil { - return nil, errors.Trace(err) - } - - return cp, nil -} - -func (c *dbCheckpoint) createSchemaIfNeed() error { - sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", pkgsql.QuoteName(c.database)) - _, err := c.db.Exec(sql) - if err != nil { - return errors.Trace(err) - } - - sql = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s( - topic_name VARCHAR(255) PRIMARY KEY, ts BIGINT NOT NULL, status INT NOT NULL)`, - pkgsql.QuoteSchema(c.database, c.table)) - _, err = c.db.Exec(sql) - if err != nil { - return errors.Trace(err) - } - - return nil -} - -// Save saves the ts and status -func (c *dbCheckpoint) Save(ts int64, status int) error { - sql := fmt.Sprintf("REPLACE INTO %s(topic_name, ts, status) VALUES(?,?,?)", - pkgsql.QuoteSchema(c.database, c.table)) - _, err := c.db.Exec(sql, c.topicName, ts, status) - if err != nil { - return errors.Annotatef(err, "exec fail: '%s', args: %s %d, %d", sql, c.topicName, ts, status) - } - - return nil -} - -// Load return ts and status, if no record in checkpoint, return err = errors.NotFoundf -func (c *dbCheckpoint) Load() (ts int64, status int, err error) { - sql := fmt.Sprintf("SELECT ts, status FROM %s WHERE topic_name = ?", - pkgsql.QuoteSchema(c.database, c.table)) - - row := c.db.QueryRow(sql, c.topicName) - - err = row.Scan(&ts, &status) - if err != nil { - if errors.Cause(err) == gosql.ErrNoRows { - return 0, 0, errors.NotFoundf("no checkpoint for: %s", c.topicName) - } - return 0, 0, errors.Trace(err) - } - - return -} diff --git a/arbiter/checkpoint_test.go b/arbiter/checkpoint_test.go deleted file mode 100644 index bd84a791a..000000000 --- a/arbiter/checkpoint_test.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "fmt" - "testing" - - gosql "database/sql" - sqlmock "github.com/DATA-DOG/go-sqlmock" - check "github.com/pingcap/check" - "github.com/pingcap/errors" - pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" -) - -func Test(t *testing.T) { check.TestingT(t) } - -type CheckpointSuite struct { -} - -var _ = check.Suite(&CheckpointSuite{}) - -func setNewExpect(mock sqlmock.Sqlmock) { - mock.ExpectExec("CREATE DATABASE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS").WillReturnResult(sqlmock.NewResult(0, 1)) -} - -func (cs *CheckpointSuite) TestNewCheckpoint(c *check.C) { - db, mock, err := sqlmock.New() - c.Assert(err, check.IsNil) - - setNewExpect(mock) - - _, err = createDbCheckpoint(db) - c.Assert(err, check.IsNil) - - c.Assert(mock.ExpectationsWereMet(), check.IsNil) -} - -func (cs *CheckpointSuite) TestSaveAndLoad(c *check.C) { - db, mock, err := sqlmock.New() - c.Assert(err, check.IsNil) - - setNewExpect(mock) - cp, err := createDbCheckpoint(db) - c.Assert(err, check.IsNil) - sql := fmt.Sprintf("SELECT (.+) FROM %s WHERE topic_name = ?", - pkgsql.QuoteSchema(cp.database, cp.table)) - mock.ExpectQuery(sql).WithArgs(cp.topicName). - WillReturnError(errors.NotFoundf("no checkpoint for: %s", cp.topicName)) - - _, _, err = cp.Load() - c.Log(err) - c.Assert(errors.IsNotFound(err), check.IsTrue) - - var saveTS int64 = 10 - saveStatus := 1 - mock.ExpectExec("REPLACE INTO"). - WithArgs(cp.topicName, saveTS, saveStatus). - WillReturnResult(sqlmock.NewResult(0, 1)) - err = cp.Save(saveTS, saveStatus) - c.Assert(err, check.IsNil) - - rows := sqlmock.NewRows([]string{"ts", "status"}). - AddRow(saveTS, saveStatus) - mock.ExpectQuery("SELECT ts, status FROM").WillReturnRows(rows) - ts, status, err := cp.Load() - c.Assert(err, check.IsNil) - c.Assert(ts, check.Equals, saveTS) - c.Assert(status, check.Equals, saveStatus) -} - -func createDbCheckpoint(db *gosql.DB) (*dbCheckpoint, error) { - cp, err := NewCheckpoint(db, "topic_name") - return cp.(*dbCheckpoint), err -} diff --git a/arbiter/config.go b/arbiter/config.go deleted file mode 100644 index e4fba5dc6..000000000 --- a/arbiter/config.go +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "encoding/json" - "flag" - "fmt" - "os" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/flags" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb-binlog/pkg/version" - "go.uber.org/zap" -) - -const ( - defaultKafkaAddrs = "127.0.0.1:9092" - defaultKafkaVersion = "0.8.2.0" -) - -var ( - errUpTopicNotSpecified = errors.Errorf("up.topic not config, please config the topic name") -) - -// Config is the configuration of Server -type Config struct { - *flag.FlagSet `json:"-"` - LogLevel string `toml:"log-level" json:"log-level"` - ListenAddr string `toml:"addr" json:"addr"` - LogFile string `toml:"log-file" json:"log-file"` - OpenSaramaLog bool `toml:"open-sarama-log" json:"open-sarama-log"` - - Up UpConfig `toml:"up" json:"up"` - Down DownConfig `toml:"down" json:"down"` - - Metrics Metrics `toml:"metrics" json:"metrics"` - configFile string - printVersion bool -} - -// Metrics is configuration of metrics -type Metrics struct { - Addr string `toml:"addr" json:"addr"` - Interval int `toml:"interval" json:"interval"` -} - -// UpConfig is configuration of upstream -type UpConfig struct { - KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` - KafkaVersion string `toml:"kafka-version" json:"kafka-version"` - - InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"` - Topic string `toml:"topic" json:"topic"` - MessageBufferSize int `toml:"message-buffer-size" json:"message-buffer-size"` - SaramaBufferSize int `toml:"sarama-buffer-size" json:"sarama-buffer-size"` -} - -// DownConfig is configuration of downstream -type DownConfig struct { - Host string `toml:"host" json:"host"` - Port int `toml:"port" json:"port"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - - WorkerCount int `toml:"worker-count" json:"worker-count"` - BatchSize int `toml:"batch-size" json:"batch-size"` - SafeMode bool `toml:"safe-mode" json:"safe-mode"` -} - -// NewConfig return an instance of configuration -func NewConfig() *Config { - cfg := &Config{} - cfg.FlagSet = flag.NewFlagSet("arbiter", flag.ContinueOnError) - fs := cfg.FlagSet - fs.Usage = func() { - fmt.Fprintln(os.Stderr, "Usage of arbiter:") - fs.PrintDefaults() - } - - fs.StringVar(&cfg.ListenAddr, "addr", "127.0.0.1:8251", "addr (i.e. 'host:port') to listen on for arbiter connections") - fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") - fs.StringVar(&cfg.configFile, "config", "", "path to the configuration file") - fs.BoolVar(&cfg.printVersion, "V", false, "print version information and exit") - fs.StringVar(&cfg.Metrics.Addr, "metrics.addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") - fs.IntVar(&cfg.Metrics.Interval, "metrics.interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") - fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") - fs.BoolVar(&cfg.OpenSaramaLog, "open-sarama-log", true, "save the logs from sarama (https://github.com/Shopify/sarama), a client of Kafka") - - fs.Int64Var(&cfg.Up.InitialCommitTS, "up.initial-commit-ts", 0, "if arbiter doesn't have checkpoint, use initial commitTS to initial checkpoint") - fs.StringVar(&cfg.Up.Topic, "up.topic", "", "topic name of kafka") - - fs.IntVar(&cfg.Down.WorkerCount, "down.worker-count", 16, "concurrency write to downstream") - fs.IntVar(&cfg.Down.BatchSize, "down.batch-size", 64, "batch size write to downstream") - fs.BoolVar(&cfg.Down.SafeMode, "safe-mode", false, "enable safe mode to make reentrant") - - return cfg -} - -func (cfg *Config) String() string { - data, err := json.MarshalIndent(cfg, "\t", "\t") - if err != nil { - log.Error("marshal Config failed", zap.Error(err)) - } - - return string(data) -} - -// Parse parses all config from command-line flags, environment vars or the configuration file -func (cfg *Config) Parse(args []string) error { - // parse first to get config file - perr := cfg.FlagSet.Parse(args) - switch perr { - case nil: - case flag.ErrHelp: - os.Exit(0) - default: - os.Exit(2) - } - if cfg.printVersion { - fmt.Println(version.GetRawVersionInfo()) - os.Exit(0) - } - - // load config file if specified - if cfg.configFile != "" { - if err := cfg.configFromFile(cfg.configFile); err != nil { - return errors.Trace(err) - } - } - - // parse again to replace with command line options - if err := cfg.FlagSet.Parse(args); err != nil { - return errors.Trace(err) - } - if len(cfg.FlagSet.Args()) > 0 { - return errors.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0)) - } - - // replace with environment vars - err := flags.SetFlagsFromEnv("BINLOG_SERVER", cfg.FlagSet) - if err != nil { - return errors.Trace(err) - } - - if err = cfg.adjustConfig(); err != nil { - return errors.Trace(err) - } - - return cfg.validate() -} - -// validate checks whether the configuration is valid -func (cfg *Config) validate() error { - if len(cfg.Up.Topic) == 0 { - return errUpTopicNotSpecified - } - - return nil -} - -func (cfg *Config) adjustConfig() error { - // cfg.Up - if len(cfg.Up.KafkaAddrs) == 0 { - cfg.Up.KafkaAddrs = defaultKafkaAddrs - } - if len(cfg.Up.KafkaVersion) == 0 { - cfg.Up.KafkaVersion = defaultKafkaVersion - } - - // cfg.Down - if len(cfg.Down.Host) == 0 { - cfg.Down.Host = "localhost" - } - if cfg.Down.Port == 0 { - cfg.Down.Port = 3306 - } - if len(cfg.Down.User) == 0 { - cfg.Down.User = "root" - } - - return nil -} - -func (cfg *Config) configFromFile(path string) error { - return util.StrictDecodeFile(path, "arbiter", cfg) -} diff --git a/arbiter/config_test.go b/arbiter/config_test.go deleted file mode 100644 index 9b765839a..000000000 --- a/arbiter/config_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "bytes" - "fmt" - "os" - "path" - "runtime" - "strings" - - "github.com/BurntSushi/toml" - "github.com/pingcap/check" -) - -type TestConfigSuite struct { -} - -var _ = check.Suite(&TestConfigSuite{}) - -func (t *TestConfigSuite) TestAdjustConfig(c *check.C) { - config := Config{ - Up: UpConfig{}, - Down: DownConfig{}, - } - err := config.adjustConfig() - c.Assert(err, check.IsNil) - c.Assert(config.Up.KafkaAddrs, check.Equals, defaultKafkaAddrs) - c.Assert(config.Up.KafkaVersion, check.Equals, defaultKafkaVersion) - c.Assert(config.Down.Host, check.Equals, "localhost") - c.Assert(config.Down.Port, check.Equals, 3306) - c.Assert(config.Down.User, check.Equals, "root") -} - -func (t *TestConfigSuite) TestParseConfig(c *check.C) { - args := make([]string, 0, 10) - - // not set `up.topic`, invalid - config := NewConfig() - configFile := getTemplateConfigFilePath() - args = append(args, fmt.Sprintf("-config=%s", configFile)) - err := config.Parse(args) - c.Assert(err, check.Equals, errUpTopicNotSpecified) - - // set `up.topic` through command line args, valid - config = NewConfig() - upTopic := "topic-test" - args = append(args, fmt.Sprintf("-up.topic=%s", upTopic)) - err = config.Parse(args) - c.Assert(err, check.IsNil) - // check config item - c.Assert(config.LogLevel, check.Equals, "info") - c.Assert(config.LogFile, check.Equals, "") - c.Assert(config.ListenAddr, check.Equals, "127.0.0.1:8251") - c.Assert(config.configFile, check.Equals, configFile) - c.Assert(config.Up.KafkaAddrs, check.Equals, defaultKafkaAddrs) - c.Assert(config.Up.KafkaVersion, check.Equals, defaultKafkaVersion) - c.Assert(config.Up.InitialCommitTS, check.Equals, int64(0)) - c.Assert(config.Up.Topic, check.Equals, upTopic) - c.Assert(config.Down.Host, check.Equals, "localhost") - c.Assert(config.Down.Port, check.Equals, 3306) - c.Assert(config.Down.User, check.Equals, "root") - c.Assert(config.Down.Password, check.Equals, "") - c.Assert(config.Down.WorkerCount, check.Equals, 16) - c.Assert(config.Down.BatchSize, check.Equals, 64) - c.Assert(config.Metrics.Addr, check.Equals, "") - c.Assert(config.Metrics.Interval, check.Equals, 15) - - // overwrite with more command line args - listenAddr := "127.0.0.1:8252" - args = append(args, fmt.Sprintf("-addr=%s", listenAddr)) - logLevel := "error" - args = append(args, fmt.Sprintf("-L=%s", logLevel)) - logFile := "arbiter.log" - args = append(args, fmt.Sprintf("-log-file=%s", logFile)) - upInitCTS := int64(123) - args = append(args, fmt.Sprintf("-up.initial-commit-ts=%d", upInitCTS)) - downWC := 456 - args = append(args, fmt.Sprintf("-down.worker-count=%d", downWC)) - downBS := 789 - args = append(args, fmt.Sprintf("-down.batch-size=%d", downBS)) - - // parse again - config = NewConfig() - err = config.Parse(args) - c.Assert(err, check.IsNil) - // check again - c.Assert(config.ListenAddr, check.Equals, listenAddr) - c.Assert(config.LogLevel, check.Equals, logLevel) - c.Assert(config.LogFile, check.Equals, logFile) - c.Assert(config.Up.InitialCommitTS, check.Equals, upInitCTS) - c.Assert(config.Down.WorkerCount, check.Equals, downWC) - c.Assert(config.Down.BatchSize, check.Equals, downBS) - - // simply verify json string - c.Assert(strings.Contains(config.String(), listenAddr), check.IsTrue) -} - -func (t *TestConfigSuite) TestParseConfigFileWithInvalidArgs(c *check.C) { - yc := struct { - LogLevel string `toml:"log-level" json:"log-level"` - ListenAddr string `toml:"addr" json:"addr"` - LogFile string `toml:"log-file" json:"log-file"` - UnrecognizedOptionTest bool `toml:"unrecognized-option-test" json:"unrecognized-option-test"` - }{ - "debug", - "127.0.0.1:8251", - "/tmp/arbiter", - true, - } - - var buf bytes.Buffer - e := toml.NewEncoder(&buf) - err := e.Encode(yc) - c.Assert(err, check.IsNil) - - configFilename := path.Join(c.MkDir(), "arbiter_config_invalid.toml") - err = os.WriteFile(configFilename, buf.Bytes(), 0644) - c.Assert(err, check.IsNil) - - args := []string{ - "--config", - configFilename, - } - - cfg := NewConfig() - err = cfg.Parse(args) - c.Assert(err, check.ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*") -} - -func getTemplateConfigFilePath() string { - // we put the template config file in "cmd/arbiter/arbiter.toml" - _, filename, _, _ := runtime.Caller(0) - return path.Join(path.Dir(filename), "../cmd/arbiter/arbiter.toml") -} diff --git a/arbiter/metrics.go b/arbiter/metrics.go deleted file mode 100644 index 7d8a70bd3..000000000 --- a/arbiter/metrics.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "fmt" - "os" - - "github.com/pingcap/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "go.uber.org/zap" -) - -var ( - checkpointTSOGauge = prometheus.NewGauge( - prometheus.GaugeOpts{ - Namespace: "binlog", - Subsystem: "arbiter", - Name: "checkpoint_tso", - Help: "save checkpoint tso of arbiter.", - }) - - queryHistogramVec = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "binlog", - Subsystem: "arbiter", - Name: "query_duration_time", - Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), - }, []string{"type"}) - - eventCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "binlog", - Subsystem: "arbiter", - Name: "event", - Help: "the count of sql event(dml, ddl).", - }, []string{"type"}) - - queueSizeGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "binlog", - Subsystem: "arbiter", - Name: "queue_size", - Help: "the size of queue", - }, []string{"name"}) - - txnLatencySecondsHistogram = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "binlog", - Subsystem: "arbiter", - Name: "txn_latency_seconds", - Help: "Bucketed histogram of seconds of a txn between loaded to downstream and committed at upstream.", - Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), - }) -) - -// Registry is the metrics registry of server -var Registry = prometheus.NewRegistry() - -func init() { - Registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - Registry.MustRegister(collectors.NewGoCollector()) - - Registry.MustRegister(checkpointTSOGauge) - Registry.MustRegister(queryHistogramVec) - Registry.MustRegister(eventCounter) - Registry.MustRegister(queueSizeGauge) - Registry.MustRegister(txnLatencySecondsHistogram) -} - -var getHostname = os.Hostname - -func instanceName(port int) string { - hostname, err := getHostname() - if err != nil { - log.Error("Failed to get hostname", zap.Error(err)) - return "unknown" - } - return fmt.Sprintf("%s_%d", hostname, port) -} diff --git a/arbiter/metrics_test.go b/arbiter/metrics_test.go deleted file mode 100644 index c9f83eaf9..000000000 --- a/arbiter/metrics_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - . "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -type instanceNameSuite struct{} - -var _ = Suite(&instanceNameSuite{}) - -func (s *instanceNameSuite) TestShouldRetUnknown(c *C) { - orig := getHostname - defer func() { - getHostname = orig - }() - getHostname = func() (string, error) { - return "", errors.New("host") - } - - n := instanceName(9090) - c.Assert(n, Equals, "unknown") -} - -func (s *instanceNameSuite) TestShouldUseHostname(c *C) { - orig := getHostname - defer func() { - getHostname = orig - }() - getHostname = func() (string, error) { - return "kendoka", nil - } - - n := instanceName(9090) - c.Assert(n, Equals, "kendoka_9090") -} diff --git a/arbiter/server.go b/arbiter/server.go deleted file mode 100644 index 2e8f2a91c..000000000 --- a/arbiter/server.go +++ /dev/null @@ -1,317 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "context" - "database/sql" - "net" - "strconv" - "strings" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/loader" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb/tidb-binlog/driver/reader" - "github.com/tikv/client-go/v2/oracle" - "go.uber.org/zap" -) - -var ( - initSafeModeDuration = time.Minute * 5 - - // Make it possible to mock the following functions - createDB = loader.CreateDB - newReader = reader.NewReader - newLoader = loader.NewLoader -) - -// Server is the server to load data to mysql -type Server struct { - cfg *Config - port int - - load loader.Loader - - checkpoint Checkpoint - kafkaReader *reader.Reader - downDB *sql.DB - - // all txn commitTS <= finishTS has loaded to downstream - finishTS int64 - - metrics *util.MetricClient - - closed bool - mu sync.Mutex -} - -// NewServer creates a Server -func NewServer(cfg *Config) (srv *Server, err error) { - srv = new(Server) - srv.cfg = cfg - - _, port, err := net.SplitHostPort(cfg.ListenAddr) - if err != nil { - return nil, errors.Annotatef(err, "wrong ListenAddr: %s", cfg.ListenAddr) - } - - srv.port, err = strconv.Atoi(port) - if err != nil { - return nil, errors.Annotatef(err, "ListenAddr: %s", cfg.ListenAddr) - } - - up := cfg.Up - down := cfg.Down - - srv.downDB, err = createDB(down.User, down.Password, down.Host, down.Port, nil) - if err != nil { - return nil, errors.Trace(err) - } - - // set checkpoint - srv.checkpoint, err = NewCheckpoint(srv.downDB, up.Topic) - if err != nil { - return nil, errors.Trace(err) - } - - srv.finishTS = up.InitialCommitTS - - status, err := srv.loadStatus() - if err != nil { - return nil, errors.Trace(err) - } - - // set reader to read binlog from kafka - readerCfg := &reader.Config{ - KafkaAddr: strings.Split(up.KafkaAddrs, ","), - CommitTS: srv.finishTS, - Topic: up.Topic, - SaramaBufferSize: up.SaramaBufferSize, - MessageBufferSize: up.MessageBufferSize, - } - - log.Info("use kafka binlog reader", zap.Reflect("cfg", readerCfg)) - - srv.kafkaReader, err = newReader(readerCfg) - if err != nil { - return nil, errors.Trace(err) - } - - log.Info("new kafka reader success") - - // set loader - srv.load, err = newLoader(srv.downDB, - loader.WorkerCount(cfg.Down.WorkerCount), - loader.BatchSize(cfg.Down.BatchSize), - loader.Metrics(&loader.MetricsGroup{ - EventCounterVec: eventCounter, - QueryHistogramVec: queryHistogramVec, - })) - if err != nil { - return nil, errors.Trace(err) - } - - if down.SafeMode { - srv.load.SetSafeMode(true) - } else { - // set safe mode in first 5 min if abnormal quit last time - if status == StatusRunning { - log.Info("set safe mode to be true") - srv.load.SetSafeMode(true) - go func() { - time.Sleep(initSafeModeDuration) - srv.load.SetSafeMode(false) - log.Info("set safe mode to be false") - }() - } - } - - // set metrics - if cfg.Metrics.Addr != "" && cfg.Metrics.Interval != 0 { - srv.metrics = util.NewMetricClient( - cfg.Metrics.Addr, - time.Duration(cfg.Metrics.Interval)*time.Second, - Registry, - ) - } - - return -} - -// Close closes the Server -func (s *Server) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - - if s.closed { - return nil - } - - s.kafkaReader.Close() - - s.closed = true - return nil -} - -// Run runs the Server, will quit once encounter error or Server is closed -func (s *Server) Run() error { - defer s.downDB.Close() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // push metrics if need - if s.metrics != nil { - go s.metrics.Start(ctx, map[string]string{"instance": instanceName(s.port)}) - } - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - s.trackTS(ctx, time.Second) - wg.Done() - }() - - var syncErr error - - syncCtx, syncCancel := context.WithCancel(ctx) - wg.Add(1) - go func() { - defer wg.Done() - syncErr = syncBinlogs(syncCtx, s.kafkaReader.Messages(), s.load) - if syncErr != nil { - s.Close() - } - }() - - err := s.load.Run() - if err != nil { - syncCancel() - s.Close() - } - - wg.Wait() - - syncCancel() - if err != nil { - return errors.Trace(err) - } - - if syncErr != nil { - return errors.Trace(syncErr) - } - - if err = s.saveFinishTS(StatusNormal); err != nil { - return errors.Trace(err) - } - - return nil -} - -func (s *Server) updateFinishTS(msg *reader.Message) { - s.finishTS = msg.Binlog.CommitTs - - ms := time.Now().UnixNano()/1000000 - oracle.ExtractPhysical(uint64(s.finishTS)) - txnLatencySecondsHistogram.Observe(float64(ms) / 1000.0) -} - -func (s *Server) saveFinishTS(status int) error { - err := s.checkpoint.Save(s.finishTS, status) - if err != nil { - return err - } - checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(s.finishTS)))) - return nil -} - -func (s *Server) trackTS(ctx context.Context, saveInterval time.Duration) { - saveTick := time.NewTicker(saveInterval) - defer saveTick.Stop() - -L: - for { - select { - case txn, ok := <-s.load.Successes(): - if !ok { - log.Info("load successes channel closed") - break L - } - msg := txn.Metadata.(*reader.Message) - log.Debug("get success binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) - s.updateFinishTS(msg) - case <-saveTick.C: - if err := s.saveFinishTS(StatusRunning); err != nil { - log.Error("save finish ts failed", zap.Error(err)) - } - case <-ctx.Done(): - break L - } - } - - if err := s.saveFinishTS(StatusRunning); err != nil { - log.Error("save finish ts failed", zap.Error(err)) - } -} - -func (s *Server) loadStatus() (int, error) { - ts, status, err := s.checkpoint.Load() - if err != nil { - if !errors.IsNotFound(err) { - return 0, errors.Trace(err) - } - log.Info("no checkpoint found") - err = nil - } else { - log.Info("load checkpoint", zap.Int64("ts", ts), zap.Int("status", status)) - s.finishTS = ts - } - return status, errors.Trace(err) -} - -func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.Loader) (err error) { - dest := ld.Input() - defer ld.Close() - var receivedTs int64 - for msg := range source { - log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) - - if msg.Binlog.CommitTs <= receivedTs { - log.Info("skip repeated binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) - continue - } - receivedTs = msg.Binlog.CommitTs - - txn, err := loader.SecondaryBinlogToTxn(msg.Binlog, nil, false) - if err != nil { - log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err)) - return err - } - txn.Metadata = msg - // avoid block when no process is handling ld.input - select { - case dest <- txn: - case <-ctx.Done(): - return nil - } - - queueSizeGauge.WithLabelValues("kafka_reader").Set(float64(len(source))) - queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest))) - } - return nil -} diff --git a/arbiter/server_test.go b/arbiter/server_test.go deleted file mode 100644 index 6a547f44f..000000000 --- a/arbiter/server_test.go +++ /dev/null @@ -1,468 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package arbiter - -import ( - "context" - "crypto/tls" - "database/sql" - "fmt" - "sync" - "time" - - "github.com/DATA-DOG/go-sqlmock" - . "github.com/pingcap/check" - "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/loader" - "github.com/pingcap/tidb/tidb-binlog/driver/reader" - pb "github.com/pingcap/tidb/tidb-binlog/proto/go-binlog" -) - -type dummyLoader struct { - loader.Loader - successes chan *loader.Txn - safe bool - input chan *loader.Txn - closed bool -} - -func (l *dummyLoader) SetSafeMode(safe bool) { - l.safe = safe -} - -func (l *dummyLoader) Successes() <-chan *loader.Txn { - return l.successes -} - -func (l *dummyLoader) Input() chan<- *loader.Txn { - return l.input -} - -func (l *dummyLoader) Close() { - l.closed = true -} - -type testNewServerSuite struct { - db *sql.DB - dbMock sqlmock.Sqlmock - origCreateDB func(string, string, string, int, *tls.Config) (*sql.DB, error) - origNewReader func(*reader.Config) (*reader.Reader, error) - origNewLoader func(*sql.DB, ...loader.Option) (loader.Loader, error) -} - -var _ = Suite(&testNewServerSuite{}) - -func (s *testNewServerSuite) SetUpTest(c *C) { - db, mock, err := sqlmock.New() - if err != nil { - c.Fatalf("Failed to create mock db: %s", err) - } - s.db = db - s.dbMock = mock - - s.origCreateDB = createDB - createDB = func(user string, password string, host string, port int, _ *tls.Config) (*sql.DB, error) { - return s.db, nil - } - - s.origNewReader = newReader - newReader = func(cfg *reader.Config) (r *reader.Reader, err error) { - return &reader.Reader{}, nil - } - - s.origNewLoader = newLoader - newLoader = func(db *sql.DB, opt ...loader.Option) (loader.Loader, error) { - return &dummyLoader{}, nil - } -} - -func (s *testNewServerSuite) TearDownTest(c *C) { - s.db.Close() - - createDB = s.origCreateDB - newReader = s.origNewReader - newLoader = s.origNewLoader -} - -func (s *testNewServerSuite) TestRejectInvalidAddr(c *C) { - cfg := Config{ListenAddr: "whatever"} - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, ".*wrong ListenAddr.*") - - cfg.ListenAddr = "whatever:invalid" - _, err = NewServer(&cfg) - c.Assert(err, ErrorMatches, "ListenAddr.*") -} - -func (s *testNewServerSuite) TestStopIfFailedtoConnectDownStream(c *C) { - createDB = func(user string, password string, host string, port int, _ *tls.Config) (*sql.DB, error) { - return nil, fmt.Errorf("Can't create db") - } - - cfg := Config{ListenAddr: "localhost:8080"} - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, "Can't create db") -} - -func (s *testNewServerSuite) TestStopIfCannotCreateCheckpoint(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE IF NOT EXISTS `tidb_binlog`").WillReturnError( - fmt.Errorf("cannot create")) - - cfg := Config{ListenAddr: "localhost:8080"} - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, "cannot create") -} - -func (s *testNewServerSuite) TestStopIfCannotLoadStatus(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectExec("CREATE TABLE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectQuery("SELECT ts, status.*"). - WithArgs("test_topic"). - WillReturnError(errors.New("Failed load")) - - cfg := Config{ - ListenAddr: "localhost:8080", - Up: UpConfig{ - Topic: "test_topic", - }, - } - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, "Failed load") -} - -func (s *testNewServerSuite) TestStopIfCannotCreateReader(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectExec("CREATE TABLE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectQuery("SELECT ts, status.*"). - WithArgs("test_topic"). - WillReturnError(errors.NotFoundf("")) - newReader = func(cfg *reader.Config) (r *reader.Reader, err error) { - return nil, errors.New("no reader") - } - - cfg := Config{ - ListenAddr: "localhost:8080", - Up: UpConfig{ - Topic: "test_topic", - }, - } - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, "no reader") -} - -func (s *testNewServerSuite) TestStopIfCannotCreateLoader(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectExec("CREATE TABLE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectQuery("SELECT ts, status.*"). - WithArgs("test_topic"). - WillReturnError(errors.New("not found")) - newLoader = func(db *sql.DB, opt ...loader.Option) (loader.Loader, error) { - return nil, errors.New("no loader") - } - - cfg := Config{ - ListenAddr: "localhost:8080", - Up: UpConfig{ - Topic: "test_topic", - }, - } - _, err := NewServer(&cfg) - c.Assert(err, ErrorMatches, "no loader") -} - -func (s *testNewServerSuite) TestSetSafeMode(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectExec("CREATE TABLE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - rows := sqlmock.NewRows([]string{"ts", "status"}).AddRow(42, StatusRunning) - s.dbMock.ExpectQuery("SELECT ts, status.*"). - WithArgs("test_topic"). - WillReturnRows(rows) - var ld dummyLoader - newLoader = func(db *sql.DB, opt ...loader.Option) (loader.Loader, error) { - return &ld, nil - } - - origDuration := initSafeModeDuration - defer func() { - initSafeModeDuration = origDuration - }() - initSafeModeDuration = 10 * time.Millisecond - - cfg := Config{ - ListenAddr: "localhost:8080", - Up: UpConfig{ - Topic: "test_topic", - }, - } - _, err := NewServer(&cfg) - c.Assert(err, IsNil) - c.Assert(ld.safe, IsTrue) - time.Sleep(2 * initSafeModeDuration) - c.Assert(ld.safe, IsFalse) -} - -func (s *testNewServerSuite) TestCreateMetricCli(c *C) { - s.dbMock.ExpectExec("CREATE DATABASE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectExec("CREATE TABLE.*").WillReturnResult(sqlmock.NewResult(0, 0)) - s.dbMock.ExpectQuery("SELECT ts, status.*"). - WithArgs("test_topic"). - WillReturnError(errors.New("not found")) - - cfg := Config{ - ListenAddr: "localhost:8080", - Up: UpConfig{ - Topic: "test_topic", - }, - Metrics: Metrics{ - Addr: "testing", - Interval: 10, - }, - } - srv, err := NewServer(&cfg) - c.Assert(err, IsNil) - c.Assert(srv.metrics, NotNil) -} - -type updateFinishTSSuite struct{} - -var _ = Suite(&updateFinishTSSuite{}) - -func (s *updateFinishTSSuite) TestShouldSetFinishTS(c *C) { - server := Server{} - msg := reader.Message{ - Binlog: &pb.Binlog{ - CommitTs: 1024, - }, - } - c.Assert(server.finishTS, Equals, int64(0)) - server.updateFinishTS(&msg) - c.Assert(server.finishTS, Equals, int64(1024)) -} - -type trackTSSuite struct{} - -var _ = Suite(&trackTSSuite{}) - -type dummyCp struct { - Checkpoint - timestamps []int64 - status []int -} - -func (cp *dummyCp) Save(ts int64, status int) error { - cp.timestamps = append(cp.timestamps, ts) - cp.status = append(cp.status, status) - return nil -} - -func (s *trackTSSuite) TestShouldUpdateFinishTS(c *C) { - cp := dummyCp{} - successes := make(chan *loader.Txn, 1) - ld := dummyLoader{successes: successes} - server := Server{ - load: &ld, - checkpoint: &cp, - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - server.trackTS(context.Background(), 50*time.Millisecond) - wg.Done() - }() - - for i := 0; i < 42; i++ { - successes <- &loader.Txn{Metadata: &reader.Message{Binlog: &pb.Binlog{CommitTs: int64(i)}}} - } - close(successes) - - wg.Wait() - c.Assert(server.finishTS, Equals, int64(41)) -} - -func (s *trackTSSuite) TestShouldSaveFinishTS(c *C) { - db, _, err := sqlmock.New() - if err != nil { - c.Fatalf("Failed to create mock db: %s", err) - } - ld, err := loader.NewLoader(db) - c.Assert(err, IsNil) - cp := dummyCp{} - server := Server{ - load: ld, - checkpoint: &cp, - } - - ctx, cancel := context.WithCancel(context.Background()) - - stop := make(chan struct{}) - go func() { - server.trackTS(ctx, 50*time.Millisecond) - close(stop) - }() - - for i := 0; i < 42; i++ { - server.finishTS = int64(i) - time.Sleep(2 * time.Millisecond) - } - - cancel() - - select { - case <-stop: - case <-time.After(time.Second): - c.Fatal("Doesn't stop in time") - } - - c.Assert(len(cp.status), Greater, 1) - c.Assert(len(cp.timestamps), Greater, 1) - c.Assert(cp.status[len(cp.status)-1], Equals, StatusRunning) - c.Assert(cp.timestamps[len(cp.timestamps)-1], Equals, int64(41)) -} - -type loadStatusSuite struct{} - -var _ = Suite(&loadStatusSuite{}) - -type configurableCp struct { - Checkpoint - ts int64 - status int - err error -} - -func (c *configurableCp) Load() (ts int64, status int, err error) { - return c.ts, c.status, c.err -} - -func (s *loadStatusSuite) TestShouldIgnoreNotFound(c *C) { - cp := configurableCp{status: StatusNormal, err: errors.NotFoundf("")} - server := Server{ - checkpoint: &cp, - } - status, err := server.loadStatus() - c.Assert(err, IsNil) - c.Assert(status, Equals, cp.status) -} - -func (s *loadStatusSuite) TestShouldSetFinishTS(c *C) { - cp := configurableCp{status: StatusRunning, ts: 1984} - server := Server{ - checkpoint: &cp, - } - status, err := server.loadStatus() - c.Assert(err, IsNil) - c.Assert(status, Equals, cp.status) - c.Assert(server.finishTS, Equals, cp.ts) -} - -func (s *loadStatusSuite) TestShouldRetErr(c *C) { - cp := configurableCp{status: StatusNormal, err: errors.New("other")} - server := Server{ - checkpoint: &cp, - } - _, err := server.loadStatus() - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, "other") -} - -type syncBinlogsSuite struct{} - -var _ = Suite(&syncBinlogsSuite{}) - -func (s *syncBinlogsSuite) createMsg(schema, table, sql string, commitTs int64) *reader.Message { - return &reader.Message{ - Binlog: &pb.Binlog{ - DdlData: &pb.DDLData{ - SchemaName: &schema, - TableName: &table, - DdlQuery: []byte(sql), - }, - CommitTs: commitTs, - }, - } -} - -func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { - source := make(chan *reader.Message, 1) - msgs := []*reader.Message{ - s.createMsg("test42", "users", "alter table users add column gender smallint", 1), - s.createMsg("test42", "users", "alter table users add column gender smallint", 1), - s.createMsg("test42", "operations", "alter table operations drop column seq", 2), - s.createMsg("test42", "users", "alter table users add column gender smallint", 1), - s.createMsg("test42", "operations", "alter table operations drop column seq", 2), - } - expectMsgs := []*reader.Message{ - s.createMsg("test42", "users", "alter table users add column gender smallint", 1), - s.createMsg("test42", "operations", "alter table operations drop column seq", 2), - } - dest := make(chan *loader.Txn, len(msgs)) - go func() { - for _, m := range msgs { - source <- m - } - close(source) - }() - ld := dummyLoader{input: dest} - - err := syncBinlogs(context.Background(), source, &ld) - c.Assert(err, IsNil) - - c.Assert(len(dest), Equals, len(expectMsgs)) - for _, m := range expectMsgs { - txn := <-dest - c.Assert(txn.Metadata.(*reader.Message), DeepEquals, m) - } - - c.Assert(ld.closed, IsTrue) -} - -func (s *syncBinlogsSuite) TestShouldQuitWhenSomeErrorOccurs(c *C) { - readerMsgs := make(chan *reader.Message, 1024) - dummyLoaderImpl := &dummyLoader{ - successes: make(chan *loader.Txn), - // input is set small to trigger blocking easily - input: make(chan *loader.Txn, 1), - } - msg := s.createMsg("test42", "users", "alter table users add column gender smallint", 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // start a routine keep sending msgs to kafka reader - go func() { - // make sure there will be some msgs in readerMsgs for test - for i := 0; i < 3; i++ { - readerMsgs <- msg - } - defer close(readerMsgs) - for { - select { - case <-ctx.Done(): - return - case readerMsgs <- msg: - } - } - }() - errCh := make(chan error) - go func() { - errCh <- syncBinlogs(ctx, readerMsgs, dummyLoaderImpl) - }() - - cancel() - select { - case err := <-errCh: - c.Assert(err, IsNil) - case <-time.After(time.Second): - c.Fatal("server doesn't quit in 1s when some error occurs in loader") - } -} diff --git a/cmd/arbiter/arbiter.toml b/cmd/arbiter/arbiter.toml deleted file mode 100644 index d72dfee16..000000000 --- a/cmd/arbiter/arbiter.toml +++ /dev/null @@ -1,24 +0,0 @@ -# Arbiter Configuration. - -# addr (i.e. 'host:port') to listen on for Arbiter connections -# addr = "127.0.0.1:8251" - -[up] -# if arbiter donesn't have checkpoint, use initial commitTS to initial checkpoint -initial-commit-ts = 0 -kafka-addrs = "127.0.0.1:9092" -kafka-version = "0.8.2.0" -# topic name of kafka to consume binlog -#topic = "" - - -[down] -host = "localhost" -port = 3306 -user = "root" -password = "" -# max concurrent write to downstream -# worker-count = 16 -# max DML operation in a transaction when write to downstream -# batch-size = 64 -# safe-mode = false diff --git a/cmd/arbiter/main.go b/cmd/arbiter/main.go deleted file mode 100644 index d7aeefe9f..000000000 --- a/cmd/arbiter/main.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "io" - stdlog "log" - "net/http" - _ "net/http/pprof" - "os" - - "github.com/Shopify/sarama" - _ "github.com/go-sql-driver/mysql" - "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/arbiter" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb-binlog/pkg/version" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "go.uber.org/zap" -) - -func main() { - cfg := arbiter.NewConfig() - if err := cfg.Parse(os.Args[1:]); err != nil { - log.Fatal("verifying flags failed. See 'arbiter --help'.", zap.Error(err)) - } - - if err := util.InitLogger(cfg.LogLevel, cfg.LogFile); err != nil { - log.Fatal("Failed to initialize log", zap.Error(err)) - } - - // We have set sarama.Logger in util.InitLogger. - if !cfg.OpenSaramaLog { - // may too many noise, discard sarama log now - sarama.Logger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags) - } - - log.Info("start arbiter...", zap.Reflect("config", cfg)) - version.PrintVersionInfo("Arbiter") - - go startHTTPServer(cfg.ListenAddr) - - srv, err := arbiter.NewServer(cfg) - if err != nil { - log.Error("new server failed", zap.Error(err)) - return - } - - util.SetupSignalHandler(func(_ os.Signal) { - srv.Close() - }) - - log.Info("start run server...") - err = srv.Run() - if err != nil { - log.Error("run server failed", zap.Error(err)) - } - - log.Info("server exit") -} - -func startHTTPServer(addr string) { - prometheus.DefaultGatherer = arbiter.Registry - http.Handle("/metrics", promhttp.Handler()) - - err := http.ListenAndServe(addr, nil) - if err != nil { - log.Fatal("listen and server http failed", zap.String("addr", addr), zap.Error(err)) - } -}