From 1a7fa771638aeae1bb07e2f57d70f82107c96908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20T=C3=B6ws?= <> Date: Mon, 29 Jun 2020 17:05:06 +0200 Subject: [PATCH 1/2] Flip order of copied chunks --- pkg/agent/sync.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/agent/sync.go b/pkg/agent/sync.go index d255bc8..592ad9d 100644 --- a/pkg/agent/sync.go +++ b/pkg/agent/sync.go @@ -141,9 +141,9 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R wp := workerpool.New(MainConfig.General.NumWorkers) defer wp.Stop() chs := time.Now() - //sync from newer to older data - endsec := eEpoch.Unix() - (i * chunkSecond) - startsec := eEpoch.Unix() - ((i + 1) * chunkSecond) + //sync from older to newer data + startsec := eEpoch.Unix() - ((hLength - i) * chunkSecond) + endsec := eEpoch.Unix() - ((hLength - i - 1) * chunkSecond) var totalpoints int64 totalpoints = 0 log.Debugf("Detected %d measurements on %s|%s", len(srp.Measurements), sdb, srp.Name) From 06eb5b185f91991aafbf00c8d2a8b2e87f6acd87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20T=C3=B6ws?= <1062119+ptoews@users.noreply.github.com> Date: Fri, 9 Oct 2020 17:43:47 +0200 Subject: [PATCH 2/2] Make copy order configurable --- README.md | 1 + pkg/agent/agent.go | 20 ++++++++++---------- pkg/agent/hacluster.go | 20 ++++++++++---------- pkg/agent/sync.go | 23 ++++++++++++++++------- pkg/main.go | 8 +++++--- 5 files changed, 42 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index d4aad35..081c1ba 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ Usage of ./bin/syncflux: -action: hamonitor(default),copy,fullcopy,replicaschema -chunk: set RW chuck periods as in the data-chuck-duration config param -config: config file +-copyorder: backward (most to least recent, default), forward (least to most recent) -db: set the db where to play -end: set the endtime do action (no valid in hamonitor) default now -full: copy full database or now()- max-retention-interval if greater retention policy diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index a11bfad..c3872d5 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -173,7 +173,7 @@ func ReplSch(master string, slave string, dbs string, newdb string, rps string, } -func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) { +func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool, copyorder string) { Cluster = initCluster(master, slave) @@ -198,16 +198,16 @@ func SchCopy(master string, slave string, dbs string, newdb string, rps string, s := time.Now() Cluster.ReplicateSchema(schema) if full { - Cluster.ReplicateDataFull(schema) + Cluster.ReplicateDataFull(schema, copyorder) } else { - Cluster.ReplicateData(schema, start, end) + Cluster.ReplicateData(schema, start, end, copyorder) } elapsed := time.Since(s) log.Infof("Copy take: %s", elapsed.String()) } -func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) { +func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool, copyorder string) { Cluster = initCluster(master, slave) @@ -230,16 +230,16 @@ func Copy(master string, slave string, dbs string, newdb string, rps string, new s := time.Now() if full { - Cluster.ReplicateDataFull(schema) + Cluster.ReplicateDataFull(schema, copyorder) } else { - Cluster.ReplicateData(schema, start, end) + Cluster.ReplicateData(schema, start, end, copyorder) } elapsed := time.Since(s) log.Infof("Copy take: %s", elapsed.String()) } -func HAMonitorStart(master string, slave string) { +func HAMonitorStart(master string, slave string, copyorder string) { Cluster = initCluster(master, slave) @@ -251,12 +251,12 @@ func HAMonitorStart(master string, slave string) { Cluster.ReplicateSchema(schema) case "data": log.Info("Replicating DATA Schema from Master to Slave") - Cluster.ReplicateDataFull(schema) + Cluster.ReplicateDataFull(schema, copyorder) case "both": log.Info("Replicating DB Schema from Master to Slave") Cluster.ReplicateSchema(schema) log.Info("Replicating DATA Schema from Master to Slave") - Cluster.ReplicateDataFull(schema) + Cluster.ReplicateDataFull(schema, copyorder) case "none": log.Info("No replication done") default: @@ -266,7 +266,7 @@ func HAMonitorStart(master string, slave string) { Cluster.Master.StartMonitor(&processWg) Cluster.Slave.StartMonitor(&processWg) time.Sleep(MainConfig.General.CheckInterval) - Cluster.SuperVisor(&processWg) + Cluster.SuperVisor(&processWg, copyorder) } diff --git a/pkg/agent/hacluster.go b/pkg/agent/hacluster.go index 688ab2f..dcf1578 100644 --- a/pkg/agent/hacluster.go +++ b/pkg/agent/hacluster.go @@ -210,7 +210,7 @@ func (hac *HACluster) ReplicateSchema(schema []*InfluxSchDb) error { return nil } -func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end time.Time) error { +func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end time.Time, copyorder string) error { for _, db := range schema { for _, rp := range db.Rps { log.Infof("Replicating Data from DB %s RP %s...", db.Name, rp.Name) @@ -220,7 +220,7 @@ func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end rn.Name = db.NewDefRp } //log.Debugf("%s RP %s... SCHEMA %#+v.", db.Name, rp.Name, db) - report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval) + report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval, copyorder) if report == nil { log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name) } @@ -233,7 +233,7 @@ func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end return nil } -func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error { +func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb, copyorder string) error { for _, db := range schema { for _, rp := range db.Rps { log.Infof("Replicating Data from DB %s RP %s....", db.Name, rp.Name) @@ -242,7 +242,7 @@ func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error { if rn.Def { rn.Name = db.NewDefRp } - report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval) + report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval, copyorder) if report == nil { log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name) } @@ -256,14 +256,14 @@ func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error { } // ScltartMonitor Main GoRutine method to begin snmp data collecting -func (hac *HACluster) SuperVisor(wg *sync.WaitGroup) { +func (hac *HACluster) SuperVisor(wg *sync.WaitGroup, copyorder string) { wg.Add(1) - go hac.startSupervisorGo(wg) + go hac.startSupervisorGo(wg, copyorder) } // OK -> CHECK_SLAVE_DOWN -> RECOVERING -> OK -func (hac *HACluster) checkCluster() { +func (hac *HACluster) checkCluster(copyorder string) { //check Master @@ -330,7 +330,7 @@ func (hac *HACluster) checkCluster() { log.Infof("HACLUSTER: INIT REFRESH SCHEMA") hac.Schema, _ = hac.GetSchema("", "", "") log.Infof("HACLUSTER: INIT REPLICATION DATA PROCESS") - hac.ReplicateData(hac.Schema, startTime, endTime) + hac.ReplicateData(hac.Schema, startTime, endTime, copyorder) elapsed := time.Since(start) log.Infof("HACLUSTER: DATA SYNCRONIZATION Took %s", elapsed.String()) @@ -369,7 +369,7 @@ func (hac *HACluster) checkCluster() { } -func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup) { +func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup, copyorder string) { defer wg.Done() log.Infof("Beginning Supervision process process each %s ", hac.CheckInterval.String()) @@ -378,7 +378,7 @@ func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup) { t := time.NewTicker(hac.CheckInterval) for { - hac.checkCluster() + hac.checkCluster(copyorder) LOOP: for { select { diff --git a/pkg/agent/sync.go b/pkg/agent/sync.go index 592ad9d..bd2b047 100644 --- a/pkg/agent/sync.go +++ b/pkg/agent/sync.go @@ -92,7 +92,7 @@ func (sr *SyncReport) RWErrors() (uint64, uint64, uint64) { return readErrors, writeErrors, readErrors + writeErrors } -func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport { +func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration, copyorder string) *SyncReport { if dbschema == nil { err := fmt.Errorf("DBSChema for DB %s is null", sdb) @@ -135,15 +135,24 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R var i int64 var dbpoints int64 + var startsec, endsec int64 dbs := time.Now() for i = 0; i < hLength; i++ { wp := workerpool.New(MainConfig.General.NumWorkers) defer wp.Stop() chs := time.Now() - //sync from older to newer data - startsec := eEpoch.Unix() - ((hLength - i) * chunkSecond) - endsec := eEpoch.Unix() - ((hLength - i - 1) * chunkSecond) + + if copyorder == "forward" { + //sync from older to newer data + startsec = eEpoch.Unix() - ((hLength - i) * chunkSecond) + endsec = eEpoch.Unix() - ((hLength - i - 1) * chunkSecond) + } else { + //sync from newer to older data + endsec = eEpoch.Unix() - (i * chunkSecond) + startsec = eEpoch.Unix() - ((i + 1) * chunkSecond) + } + var totalpoints int64 totalpoints = 0 log.Debugf("Detected %d measurements on %s|%s", len(srp.Measurements), sdb, srp.Name) @@ -212,9 +221,9 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R return Report } -func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport { +func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration, copyorder string) *SyncReport { - report := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret) + report := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret, copyorder) if len(report.BadChunks) > 0 { log.Warnf("Initializing Recovery for %d chunks", len(report.BadChunks)) newBadChunks := make([]*ChunkReport, 0) @@ -223,7 +232,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr start := time.Unix(bc.TimeStart, 0) end := time.Unix(bc.TimeEnd, 0) - recoveryrep := Sync(src, dst, sdb, ddb, srp, drp, start, end, dbschema, chunk/10, maxret) + recoveryrep := Sync(src, dst, sdb, ddb, srp, drp, start, end, dbschema, chunk/10, maxret, copyorder) newBadChunks = append(newBadChunks, recoveryrep.BadChunks...) } report.BadChunks = newBadChunks diff --git a/pkg/main.go b/pkg/main.go index 1fc668c..57e2bc4 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -47,6 +47,7 @@ var ( starttime = time.Now().Add(-3600 * 24) endtimestr string endtime = time.Now() + copyorder = "reverse" fulltime bool chunktimestr string //log level @@ -89,6 +90,7 @@ func flags() *flag.FlagSet { f.StringVar(&chunktimestr, "chunk", chunktimestr, "set RW chuck periods as in the data-chuck-duration config param") f.StringVar(&starttimestr, "start", starttimestr, "set the starttime to do action (no valid in hamonitor) default now-24h") f.StringVar(&endtimestr, "end", endtimestr, "set the endtime do action (no valid in hamonitor) default now") + f.StringVar(©order, "copyorder", copyorder, "backward (most to least recent, default), forward (least to most recent)") f.BoolVar(&fulltime, "full", fulltime, "copy full database or now()- max-retention-interval if greater retention policy") // -v = Info // -vv = debug @@ -292,15 +294,15 @@ func main() { switch action { case "hamonitor": - agent.HAMonitorStart(master, slave) + agent.HAMonitorStart(master, slave, copyorder) webui.WebServer("", httpPort, &agent.MainConfig.HTTP, agent.MainConfig.General.InstanceID) case "copy": - agent.Copy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime) + agent.Copy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime, copyorder) case "move": case "replicaschema": agent.ReplSch(master, slave, actiondb, newdb, actionrp, newrp, actionmeas) case "fullcopy": - agent.SchCopy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime) + agent.SchCopy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime, copyorder) default: fmt.Printf("Unknown action: %s", action) }