Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Order of copied chunks #39

Merged
merged 2 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Usage of ./bin/syncflux:
-action: hamonitor(default),copy,fullcopy,replicaschema
-chunk: set RW chuck periods as in the data-chuck-duration config param
-config: config file
-copyorder: backward (most to least recent, default), forward (least to most recent)
-db: set the db where to play
-end: set the endtime do action (no valid in hamonitor) default now
-full: copy full database or now()- max-retention-interval if greater retention policy
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func ReplSch(master string, slave string, dbs string, newdb string, rps string,

}

func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) {
func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool, copyorder string) {

Cluster = initCluster(master, slave)

Expand All @@ -198,16 +198,16 @@ func SchCopy(master string, slave string, dbs string, newdb string, rps string,
s := time.Now()
Cluster.ReplicateSchema(schema)
if full {
Cluster.ReplicateDataFull(schema)
Cluster.ReplicateDataFull(schema, copyorder)
} else {
Cluster.ReplicateData(schema, start, end)
Cluster.ReplicateData(schema, start, end, copyorder)
}
elapsed := time.Since(s)
log.Infof("Copy take: %s", elapsed.String())

}

func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) {
func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool, copyorder string) {

Cluster = initCluster(master, slave)

Expand All @@ -230,16 +230,16 @@ func Copy(master string, slave string, dbs string, newdb string, rps string, new

s := time.Now()
if full {
Cluster.ReplicateDataFull(schema)
Cluster.ReplicateDataFull(schema, copyorder)
} else {
Cluster.ReplicateData(schema, start, end)
Cluster.ReplicateData(schema, start, end, copyorder)
}
elapsed := time.Since(s)
log.Infof("Copy take: %s", elapsed.String())

}

func HAMonitorStart(master string, slave string) {
func HAMonitorStart(master string, slave string, copyorder string) {

Cluster = initCluster(master, slave)

Expand All @@ -251,12 +251,12 @@ func HAMonitorStart(master string, slave string) {
Cluster.ReplicateSchema(schema)
case "data":
log.Info("Replicating DATA Schema from Master to Slave")
Cluster.ReplicateDataFull(schema)
Cluster.ReplicateDataFull(schema, copyorder)
case "both":
log.Info("Replicating DB Schema from Master to Slave")
Cluster.ReplicateSchema(schema)
log.Info("Replicating DATA Schema from Master to Slave")
Cluster.ReplicateDataFull(schema)
Cluster.ReplicateDataFull(schema, copyorder)
case "none":
log.Info("No replication done")
default:
Expand All @@ -266,7 +266,7 @@ func HAMonitorStart(master string, slave string) {
Cluster.Master.StartMonitor(&processWg)
Cluster.Slave.StartMonitor(&processWg)
time.Sleep(MainConfig.General.CheckInterval)
Cluster.SuperVisor(&processWg)
Cluster.SuperVisor(&processWg, copyorder)

}

Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/hacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (hac *HACluster) ReplicateSchema(schema []*InfluxSchDb) error {
return nil
}

func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end time.Time) error {
func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end time.Time, copyorder string) error {
for _, db := range schema {
for _, rp := range db.Rps {
log.Infof("Replicating Data from DB %s RP %s...", db.Name, rp.Name)
Expand All @@ -220,7 +220,7 @@ func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end
rn.Name = db.NewDefRp
}
//log.Debugf("%s RP %s... SCHEMA %#+v.", db.Name, rp.Name, db)
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval, copyorder)
if report == nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name)
}
Expand All @@ -233,7 +233,7 @@ func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end
return nil
}

func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error {
func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb, copyorder string) error {
for _, db := range schema {
for _, rp := range db.Rps {
log.Infof("Replicating Data from DB %s RP %s....", db.Name, rp.Name)
Expand All @@ -242,7 +242,7 @@ func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error {
if rn.Def {
rn.Name = db.NewDefRp
}
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval, copyorder)
if report == nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name)
}
Expand All @@ -256,14 +256,14 @@ func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error {
}

// ScltartMonitor Main GoRutine method to begin snmp data collecting
func (hac *HACluster) SuperVisor(wg *sync.WaitGroup) {
func (hac *HACluster) SuperVisor(wg *sync.WaitGroup, copyorder string) {
wg.Add(1)
go hac.startSupervisorGo(wg)
go hac.startSupervisorGo(wg, copyorder)
}

// OK -> CHECK_SLAVE_DOWN -> RECOVERING -> OK

func (hac *HACluster) checkCluster() {
func (hac *HACluster) checkCluster(copyorder string) {

//check Master

Expand Down Expand Up @@ -330,7 +330,7 @@ func (hac *HACluster) checkCluster() {
log.Infof("HACLUSTER: INIT REFRESH SCHEMA")
hac.Schema, _ = hac.GetSchema("", "", "")
log.Infof("HACLUSTER: INIT REPLICATION DATA PROCESS")
hac.ReplicateData(hac.Schema, startTime, endTime)
hac.ReplicateData(hac.Schema, startTime, endTime, copyorder)
elapsed := time.Since(start)
log.Infof("HACLUSTER: DATA SYNCRONIZATION Took %s", elapsed.String())

Expand Down Expand Up @@ -369,7 +369,7 @@ func (hac *HACluster) checkCluster() {

}

func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup) {
func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup, copyorder string) {
defer wg.Done()

log.Infof("Beginning Supervision process process each %s ", hac.CheckInterval.String())
Expand All @@ -378,7 +378,7 @@ func (hac *HACluster) startSupervisorGo(wg *sync.WaitGroup) {

t := time.NewTicker(hac.CheckInterval)
for {
hac.checkCluster()
hac.checkCluster(copyorder)
LOOP:
for {
select {
Expand Down
23 changes: 16 additions & 7 deletions pkg/agent/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (sr *SyncReport) RWErrors() (uint64, uint64, uint64) {
return readErrors, writeErrors, readErrors + writeErrors
}

func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport {
func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration, copyorder string) *SyncReport {

if dbschema == nil {
err := fmt.Errorf("DBSChema for DB %s is null", sdb)
Expand Down Expand Up @@ -135,15 +135,24 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R

var i int64
var dbpoints int64
var startsec, endsec int64
dbs := time.Now()

for i = 0; i < hLength; i++ {
wp := workerpool.New(MainConfig.General.NumWorkers)
defer wp.Stop()
chs := time.Now()
//sync from newer to older data
endsec := eEpoch.Unix() - (i * chunkSecond)
startsec := eEpoch.Unix() - ((i + 1) * chunkSecond)

if copyorder == "forward" {
//sync from older to newer data
startsec = eEpoch.Unix() - ((hLength - i) * chunkSecond)
endsec = eEpoch.Unix() - ((hLength - i - 1) * chunkSecond)
} else {
//sync from newer to older data
endsec = eEpoch.Unix() - (i * chunkSecond)
startsec = eEpoch.Unix() - ((i + 1) * chunkSecond)
}

var totalpoints int64
totalpoints = 0
log.Debugf("Detected %d measurements on %s|%s", len(srp.Measurements), sdb, srp.Name)
Expand Down Expand Up @@ -212,9 +221,9 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R
return Report
}

func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport {
func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration, copyorder string) *SyncReport {

report := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret)
report := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret, copyorder)
if len(report.BadChunks) > 0 {
log.Warnf("Initializing Recovery for %d chunks", len(report.BadChunks))
newBadChunks := make([]*ChunkReport, 0)
Expand All @@ -223,7 +232,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr
start := time.Unix(bc.TimeStart, 0)
end := time.Unix(bc.TimeEnd, 0)

recoveryrep := Sync(src, dst, sdb, ddb, srp, drp, start, end, dbschema, chunk/10, maxret)
recoveryrep := Sync(src, dst, sdb, ddb, srp, drp, start, end, dbschema, chunk/10, maxret, copyorder)
newBadChunks = append(newBadChunks, recoveryrep.BadChunks...)
}
report.BadChunks = newBadChunks
Expand Down
8 changes: 5 additions & 3 deletions pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
starttime = time.Now().Add(-3600 * 24)
endtimestr string
endtime = time.Now()
copyorder = "reverse"
fulltime bool
chunktimestr string
//log level
Expand Down Expand Up @@ -89,6 +90,7 @@ func flags() *flag.FlagSet {
f.StringVar(&chunktimestr, "chunk", chunktimestr, "set RW chuck periods as in the data-chuck-duration config param")
f.StringVar(&starttimestr, "start", starttimestr, "set the starttime to do action (no valid in hamonitor) default now-24h")
f.StringVar(&endtimestr, "end", endtimestr, "set the endtime do action (no valid in hamonitor) default now")
f.StringVar(&copyorder, "copyorder", copyorder, "backward (most to least recent, default), forward (least to most recent)")
f.BoolVar(&fulltime, "full", fulltime, "copy full database or now()- max-retention-interval if greater retention policy")
// -v = Info
// -vv = debug
Expand Down Expand Up @@ -292,15 +294,15 @@ func main() {

switch action {
case "hamonitor":
agent.HAMonitorStart(master, slave)
agent.HAMonitorStart(master, slave, copyorder)
webui.WebServer("", httpPort, &agent.MainConfig.HTTP, agent.MainConfig.General.InstanceID)
case "copy":
agent.Copy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime)
agent.Copy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime, copyorder)
case "move":
case "replicaschema":
agent.ReplSch(master, slave, actiondb, newdb, actionrp, newrp, actionmeas)
case "fullcopy":
agent.SchCopy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime)
agent.SchCopy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime, copyorder)
default:
fmt.Printf("Unknown action: %s", action)
}
Expand Down