Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dts to replica databases/groups #834

Merged
merged 2 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ registry:
options:
endpoints: "http://etcd:2379"

dts:
enable: false
name: dtle
options:
endpoints: "http://dtle:4646"

# name: nacos
# options:
# endpoints: "127.0.0.1:8848"
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type configWriter interface {
// UpsertCluster upserts a cluster into an existing tenant.
UpsertCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// ExtendCluster extends a cluster in an existing tenant.
ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// RemoveCluster removes a cluster from an existing tenant.
RemoveCluster(ctx context.Context, tenant, cluster string) error

Expand Down
250 changes: 250 additions & 0 deletions pkg/admin/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package admin

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
Expand Down Expand Up @@ -336,6 +341,251 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st
return nil
}

func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} {
jobJson := make(map[string]interface{})
jobBody := make(map[string]interface{})
jobJson["Job"] = jobBody

jobId := tenant + "-" + cluster + "-" + src + "-" + dst + "-" + time.Now().Format("20060102150405")
jobBody["ID"] = jobId
jobBody["Datacenters"] = []string{"dc1"}
jobGroups := make([]interface{}, 0, 2)

jobSrc := make(map[string]interface{})
jobSrc["Name"] = "src"

jobTasks := make([]interface{}, 0, 1)
jobTask := make(map[string]interface{})
jobTask["Name"] = "src"
jobTask["Driver"] = "dtle"
jobConfig := make(map[string]interface{})
jobConfig["Gtid"] = ""
jobReplicate := make([]interface{}, 0, 1)
jobDatabase := make(map[string]interface{})
jobDatabase["TableSchema"] = src
jobDatabase["TableSchemaRename"] = dst
jobTables := []map[string]string{}
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, _ := config.ParseTopology(vTable.Topology.DbPattern)
tbFormat, _, tbEnd, _ := config.ParseTopology(vTable.Topology.TblPattern)
tableNum := int((tbEnd + 1) / (dbEnd + 1))
for j := 0; j < tableNum; j++ {
jobTable := map[string]string{}
jobTable["TableName"] = fmt.Sprintf(tbFormat, idx*tableNum+j)
jobTable["TableRename"] = fmt.Sprintf(tbFormat, idx*tableNum+j+tbEnd+1)
jobTables = append(jobTables, jobTable)
}
}
jobDatabase["Tables"] = jobTables

jobReplicate = append(jobReplicate, jobDatabase)
jobConfig["ReplicateDoDb"] = jobReplicate
jobSrcConfig := make(map[string]interface{})
jobSrcConfig["Host"] = srcNode.Host
jobSrcConfig["Port"] = srcNode.Port
jobSrcConfig["User"] = srcNode.Username
jobSrcConfig["Password"] = srcNode.Password
jobConfig["SrcConnectionConfig"] = jobSrcConfig
jobDstConfig := make(map[string]interface{})
jobDstConfig["Host"] = dstNode.Host
jobDstConfig["Port"] = dstNode.Port
jobDstConfig["User"] = dstNode.Username
jobDstConfig["Password"] = dstNode.Password
jobConfig["DestConnectionConfig"] = jobDstConfig
jobTask["Config"] = jobConfig
jobSrc["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobSrc)

jobDst := make(map[string]interface{})
jobDst["Name"] = "dest"
jobTasks = make([]interface{}, 0, 1)
jobTask = make(map[string]interface{})
jobTask["Name"] = "dest"
jobTask["Driver"] = "dtle"
jobTask["Config"] = map[string]string{"DestType": "mysql"}
jobDst["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobDst)
jobBody["TaskGroups"] = jobGroups

return jobJson
}

func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error {
//1、校验node和group,保证node和group翻倍(缩容将node和group减半,流程同理)
groups, err := cs.ListDBGroups(ctx, tenant, cluster)
if err != nil {
return err
}
if len(groups) != len(body.Groups) {
return perrors.Errorf("new groups is not equle to old groups")
}
vtables, err := cs.ListTables(ctx, tenant, cluster)
if err != nil {
return err
}
allNodes, err := cs.ListNodes(ctx, tenant)
if err != nil {
return err
}

//2、创建复制group(物理数据库)的dts任务
//groups[0] --> body.Groups[0]
//groups[1] --> body.Groups[1]
//...
httpClient := &http.Client{}
dtsJobList := make([]map[string]interface{}, 0, len(groups))
dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string)
for i := range groups {
srcGroup := groups[i].Name
var srcNode, dstNode *NodeDTO
for n := range allNodes {
if allNodes[n].Database == srcGroup {
srcNode = allNodes[n]
}
}
dstGroup := body.Groups[i]
for n := range allNodes {
if allNodes[n].Database == dstGroup {
dstNode = allNodes[n]
}
}
dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i)
if dtsJob == nil {
return perrors.Errorf("failed to build DTS json parameter")
}
dtsJobList = append(dtsJobList, dtsJob)
dtsJobJson, _ := json.Marshal(dtsJob)
httpReq, err := http.NewRequest("POST", dtsEndpoint+"/v1/jobs", bytes.NewBuffer(dtsJobJson))
if err != nil {
return perrors.Errorf("failed to create POST http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to start to replica source group")
}
httpResp.Body.Close()
}

//3、检查是否复制完毕
for {
time.Sleep(5 * time.Second)
finished := false

for i := range dtsJobList {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("GET", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create GET http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to check replica source group")
}

//TODO: check Status
finished = true
httpResp.Body.Close()
}

if finished {
break
}
}

//4、断开并拒绝所有客户端连接

//5、再次检查是否复制完毕

//6、停止dts任务
for i := range groups {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("DELETE", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create DELETE http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to stop to replica source group")
}
httpResp.Body.Close()
}

//7、更新groups节点
var groupBody GroupDTO
var groupNode string
for i := range body.Groups {
groupNode = ""
for n := range allNodes {
if allNodes[n].Database == body.Groups[i] {
groupNode = allNodes[n].Name
}
}
if strings.Compare(groupNode, "") == 0 {
continue
}
groupBody.ClusterName = cluster
groupBody.Name = body.Groups[i]
groupBody.Nodes = []string{groupNode}
err = cs.UpsertGroup(ctx, tenant, cluster, groupBody.Name, &groupBody)
if err != nil {
return err
}
}

//8、更新sharding路由
var tableBody TableDTO
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, err := config.ParseTopology(vTable.Topology.DbPattern)
if err != nil {
return err
}
_, _, tbEnd, err := config.ParseTopology(vTable.Topology.TblPattern)
if err != nil {
return err
}
dbTotal := 2 * (dbEnd + 1)
tableTotal := 2 * (tbEnd + 1)

tableBody.Name = vTable.Name
tableBody.Sequence = vTable.Sequence
tableBody.DbRules = []*config.Rule{
{
Columns: vTable.DbRules[0].Columns,
Type: vTable.DbRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal) + " / " + fmt.Sprintf("%d", dbTotal),
},
}
tableBody.TblRules = []*config.Rule{
{
Columns: vTable.TblRules[0].Columns,
Type: vTable.TblRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal),
},
}
tableBody.Topology = &config.Topology{
DbPattern: cluster + fmt.Sprintf("_${0000..%04d}", dbTotal-1),
TblPattern: vTable.Name + fmt.Sprintf("_${0000..%04d}", tableTotal-1),
}
tableBody.ShadowTopology = vTable.ShadowTopology
tableBody.Attributes = vTable.Attributes
err = cs.UpsertTable(ctx, tenant, cluster, tableBody.Name, &tableBody)
if err != nil {
return err
}
}

//9、接受客户端连接

return nil
}

func (cs *MyConfigService) RemoveCluster(ctx context.Context, tenant, cluster string) error {
op, err := cs.getCenter(ctx, tenant)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/admin/router/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {
router.POST("/tenants/:tenant/clusters", CreateCluster)
router.GET("/tenants/:tenant/clusters/:cluster", GetCluster)
router.PUT("/tenants/:tenant/clusters/:cluster", UpdateCluster)
router.POST("/tenants/:tenant/clusters/:cluster", ExtendCluster)
router.DELETE("/tenants/:tenant/clusters/:cluster", RemoveCluster)
})
}
Expand Down Expand Up @@ -107,6 +108,23 @@ func UpdateCluster(c *gin.Context) error {
return nil
}

func ExtendCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
cluster := c.Param("cluster")
var clusterBody admin.ClusterDTO
if err := c.ShouldBindJSON(&clusterBody); err != nil {
return exception.Wrap(exception.CodeInvalidParams, err)
}

err := service.ExtendCluster(c, tenant, cluster, &clusterBody)
if err != nil {
return err
}
c.JSON(http.StatusOK, "success")
return nil
}

func RemoveCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/arana-db/arana/pkg/util/log"
)

var BootOpts *BootOptions

// LoadBootOptions loads BootOptions from specified file path.
func LoadBootOptions(path string) (*BootOptions, error) {
content, err := os.ReadFile(path)
Expand All @@ -59,6 +61,7 @@ func LoadBootOptions(path string) (*BootOptions, error) {
return nil, errors.Wrap(err, "failed to validate boot config")
}

BootOpts = &cfg
log.Init(cfg.Logging)
return &cfg, nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ type (
Options map[string]interface{} `yaml:"options" json:"options"`
}

Dts struct {
Enable bool `yaml:"enable" json:"enable"`
Name string `yaml:"name" json:"name"`
Options map[string]interface{} `yaml:"options" json:"options"`
}

BootOptions struct {
Spec `yaml:",inline"`
Config *Options `yaml:"config" json:"config"`
Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"`
Registry *Registry `yaml:"registry" json:"registry"`
Dts *Dts `yaml:"dts" json:"dts"`
Trace *Trace `yaml:"trace" json:"trace"`
Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"`
Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"`
Expand Down
5 changes: 3 additions & 2 deletions pkg/runtime/optimize/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ func drdsCreateTable(ctx context.Context, o *optimize.Optimizer) (*rule.VTable,
{
Columns: []*config.ColumnRule{{Name: dbColume}},
Expr: strings.ReplaceAll(dbColume, dbColume, "$0") + " % " +
fmt.Sprintf("%d", stmt.Partition.Num),
fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num) + " / " +
fmt.Sprintf("%d", stmt.Partition.Sub.Num),
},
},
TblRules: []*config.Rule{
{
Columns: []*config.ColumnRule{{Name: tbColume}},
Expr: strings.ReplaceAll(tbColume, tbColume, "$0") + " % " +
fmt.Sprintf("%d", stmt.Partition.Sub.Num),
fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num),
},
},
Topology: &config.Topology{
Expand Down
1 change: 1 addition & 0 deletions scripts/sharding.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0006` LIKE `employees_000
CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0007` LIKE `employees_0000`.`student_0000`;

INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (1, 1, 'arana', 95, 'Awesome Arana', 0, 2021, NOW(), NOW());
INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (33, 33, 'arana33', 95, 'Awesome Arana', 0, 2021, NOW(), NOW());

CREATE TABLE IF NOT EXISTS `employees_0000`.`friendship_0000`
(
Expand Down
Loading