diff --git a/README.md b/README.md index 9e82338..6e46ec7 100644 --- a/README.md +++ b/README.md @@ -207,7 +207,7 @@ Allows the user to copy DB schemas from DB1 to DB2. DB schema are DBs and RPs. ___Syntax___ ``` -./bin/syncflux -action replicaschema [-master ] [-slave ] [-db ] [-newdb ] [-newrp ] +./bin/syncflux -action replicaschema [-master ] [-slave ] [-db ] [-newdb ] [-rp ] [-newrp ] [-meas ] ``` ___Description of syntax___ @@ -282,7 +282,7 @@ Influx02 schema ``` -*Example 3*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) +*Example 3*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from rp1 ``` Influx01 schema @@ -297,7 +297,7 @@ Influx01 schema ``` ```bash -./bin/syncflux -action "replicaschema" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" +./bin/syncflux -action "replicaschema" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -rp "^rp1$" ``` The result will be that the schema of Influx01 will be replicated on Influx02 @@ -307,10 +307,9 @@ Influx02 schema ---------------- |-- db3 |-- rp1* - |-- rp2 ``` -*Example 4*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and set the defaultrp to rp3 +*Example 4*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and set the defaultrp to rp3 ```bash Influx01 schema @@ -338,6 +337,23 @@ Influx02 schema |-- rp2 ``` +*Example 5*: Copy data and schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*" + +```bash +Influx01 schema +---------------- + + |-- db1 + |-- rp1* + |-- cpu + |-- mem + |-- swap + |-- ... + |-- rp2 + |-- db2 + |-- rp1* + |-- rp2 +``` #### Copy data @@ -347,7 +363,7 @@ Allows the user to copy DB data from master to slave. DB schema are DBs and RPs. ___Syntax___ ``` -./bin/syncflux -action copy [-master ] [-slave ] [-db ] [-newdb ] [-newrp ] { [-start ] [-endtime ] , [-full] } +./bin/syncflux -action copy [-master ] [-slave ] [-db ] [-newdb ] [-rp ] [-newrp ] [-meas ] { [-start ] [-endtime ] , [-full] } ``` ___Description of syntax___ @@ -397,7 +413,7 @@ Influx02 schema ``` -*Example 2*: Copy data from Influx01-DB1 to Influx02 on a time window +*Example 2*: Copy data from Influx01-DB1 to Influx02 on a time window and only from rp1 ```bash Influx01 schema @@ -412,10 +428,10 @@ Influx01 schema ``` ```bash -./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -start -10h end -5h +./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -rp "^rp1$" -start -10h end -5h ``` -The command above will repicate all data from Influx01 to InfluxDB but only from db1 and with a time window from -10h to -5h +The command above will repicate all data from Influx01 to InfluxDB but only from db1.rp1 and with a time window from -10h to -5h ```bash Influx02 schema @@ -482,6 +498,40 @@ Influx02 schema |-- rp2 ``` +*Example 5*: Copy data from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*" + +```bash +Influx01 schema +---------------- + + |-- db1 + |-- rp1* + |-- cpu + |-- mem + |-- swap + |-- ... + |-- rp2 + |-- db2 + |-- rp1* + |-- rp2 +``` + + +```bash +./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -mes "cpu.*" +``` + +The command above will replicate all data from Influx01-db1 to InfluxDB on a new DB called 'db3' and a new defaultrp called rp3 + +```bash +Influx02 schema +---------------- + |-- db3 + |-- rp3* + |-- cpu + |-- rp2 +``` + #### Copy data + schema Allows the user to copy DB data from master to slave. DB schema are DBs and RPs. @@ -490,7 +540,7 @@ Allows the user to copy DB data from master to slave. DB schema are DBs and RPs. ___Syntax___ ``` -./bin/syncflux -action fullcopy [-master ] [-slave ] [-db ] [-newdb ] [-newrp ] { [-start ] [-endtime ] , [-full] } +./bin/syncflux -action fullcopy [-master ] [-slave ] [-db ] [-newdb ] [-rp ] [-newrp ] [-meas ] { [-start ] [-endtime ] , [-full] } ``` ___Description of syntax___ @@ -628,7 +678,39 @@ Influx02 schema |-- rp2 ``` +*Example 5*: Copy data and schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*" +```bash +Influx01 schema +---------------- + + |-- db1 + |-- rp1* + |-- cpu + |-- mem + |-- swap + |-- ... + |-- rp2 + |-- db2 + |-- rp1* + |-- rp2 +``` + + +```bash +./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -mes "cpu.*" +``` + +The command above will create the schema and will replicate all data from Influx01-db1 to InfluxDB on a new DB called 'db3' and a new defaultrp called rp3 + +```bash +Influx02 schema +---------------- + |-- db3 + |-- rp3* + |-- cpu + |-- rp2 +``` ### Run as a HA Cluster monitor diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 776fd92..230976f 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -144,11 +144,11 @@ func initCluster(master string, slave string) *HACluster { } } -func ReplSch(master string, slave string, dbs string, newdb string, newrp string) { +func ReplSch(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string) { Cluster = initCluster(master, slave) - schema, err := Cluster.GetSchema(dbs) + schema, err := Cluster.GetSchema(dbs, rps, meas) if err != nil { log.Errorf("Can not copy data , error on get Schema: %s", err) return @@ -173,11 +173,11 @@ func ReplSch(master string, slave string, dbs string, newdb string, newrp string } -func SchCopy(master string, slave string, dbs string, newdb string, newrp string, start time.Time, end time.Time, full bool) { +func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) { Cluster = initCluster(master, slave) - schema, err := Cluster.GetSchema(dbs) + schema, err := Cluster.GetSchema(dbs, rps, meas) if err != nil { log.Errorf("Can not copy data , error on get Schema: %s", err) return @@ -207,11 +207,11 @@ func SchCopy(master string, slave string, dbs string, newdb string, newrp string } -func Copy(master string, slave string, dbs string, newdb string, newrp string, start time.Time, end time.Time, full bool) { +func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) { Cluster = initCluster(master, slave) - schema, err := Cluster.GetSchema(dbs) + schema, err := Cluster.GetSchema(dbs, rps, meas) if err != nil { log.Errorf("Can not copy data , error on get Schema: %s", err) return @@ -243,7 +243,7 @@ func HAMonitorStart(master string, slave string) { Cluster = initCluster(master, slave) - schema, _ := Cluster.GetSchema("") + schema, _ := Cluster.GetSchema("", "", "") switch MainConfig.General.InitialReplication { case "schema": diff --git a/pkg/agent/client.go b/pkg/agent/client.go index cd3bceb..44ef3e1 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -232,10 +232,11 @@ func GetRetentionPolicies(con client.Client, db string) ([]*RetPol, error) { return rparray, nil } -func GetFields(c client.Client, sdb string, meas string) map[string]string { - ret := make(map[string]string) +func GetFields(c client.Client, sdb string, meas string, defrp string) map[string]*FieldSch { - cmd := "show field keys from " + meas + fields := make(map[string]*FieldSch) + + cmd := "show field keys from \"" + defrp + "\"." + meas //get measurements from database q := client.Query{ Command: cmd, @@ -258,16 +259,15 @@ func GetFields(c client.Client, sdb string, meas string) map[string]string { for _, row := range values { fieldname := row[0].(string) fieldtype := row[1].(string) - ret[fieldname] = fieldtype + fields[fieldname] = &FieldSch{Name: fieldname, Type: fieldtype} log.Debugf("Detected Field [%s] type [%s] on measurement [%s]", fieldname, fieldtype, meas) } } - - return ret + return fields } -func GetMeasurements(c client.Client, sdb string) []string { +func GetMeasurements(c client.Client, sdb string, mesafilter string) []*MeasurementSch { cmd := "show measurements" //get measurements from database @@ -276,7 +276,7 @@ func GetMeasurements(c client.Client, sdb string) []string { Database: sdb, } - var measurements []string + var measurements []*MeasurementSch response, err := c.Query(q) if err != nil { @@ -297,7 +297,8 @@ func GetMeasurements(c client.Client, sdb string) []string { for _, row := range values { measurement := fmt.Sprintf("%v", row[0]) - measurements = append(measurements, measurement) + measurements = append(measurements, &MeasurementSch{Name: measurement, Fields: nil}) + time.Sleep(3 * time.Millisecond) } @@ -323,7 +324,7 @@ func StrUnixNano2Time(tstamp string) (time.Time, error) { return time.Unix(sec, nsec), nil } -func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string]string) (client.BatchPoints, int64, error) { +func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string]*FieldSch) (client.BatchPoints, int64, error) { var totalpoints int64 RWMaxRetries := MainConfig.General.RWMaxRetries RWRetryDelay := MainConfig.General.RWRetryDelay @@ -427,7 +428,7 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string switch vt := val.(type) { case json.Number: tp := fieldmap[ser.Columns[i]] - switch tp { + switch tp.Type { case "float": conv, err := vt.Float64() if err != nil { @@ -587,8 +588,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr var totalpoints int64 totalpoints = 0 - for m, sch := range dbschema.Ftypes { - + for m, sch := range dbschema.Measurements { m := m sch := sch @@ -597,7 +597,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr log.Tracef("Processing measurement %s with schema #%+v", m, sch) log.Debugf("processing Database %s Measurement %s from %d to %d", sdb, m, startsec, endsec) getvalues := fmt.Sprintf("select * from \"%v\" where time > %vs and time < %vs group by *", m, startsec, endsec) - batchpoints, np, err := ReadDB(src.cli, sdb, srp.Name, ddb, drp.Name, getvalues, sch) + batchpoints, np, err := ReadDB(src.cli, sdb, srp.Name, ddb, drp.Name, getvalues, sch.Fields) if err != nil { log.Errorf("error in read %s", err) return diff --git a/pkg/agent/hacluster.go b/pkg/agent/hacluster.go index e8c86b2..b48d3a5 100644 --- a/pkg/agent/hacluster.go +++ b/pkg/agent/hacluster.go @@ -7,12 +7,22 @@ import ( ) type InfluxSchDb struct { - Name string - NewName string - DefRp string - NewDefRp string - Rps []*RetPol - Ftypes map[string]map[string]string + Name string + NewName string + DefRp string + NewDefRp string + Rps []*RetPol + Measurements map[string]*MeasurementSch +} + +type MeasurementSch struct { + Name string + Fields map[string]*FieldSch +} + +type FieldSch struct { + Name string + Type string } type HACluster struct { @@ -63,14 +73,18 @@ func (hac *HACluster) GetStatus() *ClusterStatus { } // From Master to Slave -func (hac *HACluster) GetSchema(dbfilter string) ([]*InfluxSchDb, error) { +func (hac *HACluster) GetSchema(dbfilter string, rpfilter string, measfilter string) ([]*InfluxSchDb, error) { schema := []*InfluxSchDb{} - var filter *regexp.Regexp + + var filterdb *regexp.Regexp + var filterrp *regexp.Regexp + var filtermeas *regexp.Regexp + var err error if len(dbfilter) > 0 { - filter, err = regexp.Compile(dbfilter) + filterdb, err = regexp.Compile(dbfilter) if err != nil { return nil, err } @@ -80,7 +94,7 @@ func (hac *HACluster) GetSchema(dbfilter string) ([]*InfluxSchDb, error) { for _, db := range srcDBs { - if len(dbfilter) > 0 && !filter.MatchString(db) { + if len(dbfilter) > 0 && !filterdb.MatchString(db) { log.Debugf("Database %s not match to regex %s: skipping.. ", db, dbfilter) continue } @@ -92,12 +106,23 @@ func (hac *HACluster) GetSchema(dbfilter string) ([]*InfluxSchDb, error) { continue } + if len(rpfilter) > 0 { + filterrp, err = regexp.Compile(rpfilter) + if err != nil { + return nil, err + } + } + //check for default RP var defaultRp *RetPol + for _, rp := range rps { + if len(rpfilter) > 0 && !filterrp.MatchString(rp.Name) { + log.Debugf("Retention policy %s not match to regex %s: skipping.. ", rp.Name, rpfilter) + continue + } if rp.Def { defaultRp = rp - break } } @@ -107,16 +132,29 @@ func (hac *HACluster) GetSchema(dbfilter string) ([]*InfluxSchDb, error) { continue } - meas := GetMeasurements(hac.Master.cli, db) + meas := GetMeasurements(hac.Master.cli, db, measfilter) - mf := make(map[string]map[string]string, len(meas)) + if len(measfilter) > 0 { + filtermeas, err = regexp.Compile(measfilter) + if err != nil { + return nil, err + } + } + + mf := make(map[string]*MeasurementSch, len(meas)) for _, m := range meas { + + if len(measfilter) > 0 && !filtermeas.MatchString(m.Name) { + log.Debugf("Measurement %s not match to regex %s: skipping.. ", m.Name, measfilter) + continue + } + log.Debugf("discovered measurement %s on DB: %s", m, db) - mf[m] = GetFields(hac.Master.cli, db, m) + mf[m.Name] = m + mf[m.Name].Fields = GetFields(hac.Master.cli, db, m.Name, defaultRp.Name) } - // - schema = append(schema, &InfluxSchDb{Name: db, NewName: db, DefRp: defaultRp.Name, NewDefRp: defaultRp.Name, Rps: rps, Ftypes: mf}) + schema = append(schema, &InfluxSchDb{Name: db, NewName: db, DefRp: defaultRp.Name, NewDefRp: defaultRp.Name, Rps: rps, Measurements: mf}) } hac.Schema = schema return schema, nil diff --git a/pkg/main.go b/pkg/main.go index 802a544..750c01b 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -38,6 +38,8 @@ var ( master string slave string actiondb = ".*" + actionrp = ".*" + actionmeas = ".*" newdb string newrp string starttimestr string @@ -79,6 +81,8 @@ func flags() *flag.FlagSet { f.StringVar(&master, "master", master, "choose master ID from all those in the config file where to get data (override the master-db parameter in the config file)") f.StringVar(&slave, "slave", slave, "choose master ID from all those in the config file where to write data (override the slave-db parameter in the config file)") f.StringVar(&actiondb, "db", actiondb, "set the db where to play") + f.StringVar(&actionrp, "rp", actionrp, "set the rp where to play") + f.StringVar(&actionmeas, "meas", actionmeas, "set the meas where to play") f.StringVar(&newdb, "newdb", newdb, "set the db to work on") f.StringVar(&newrp, "newrp", newrp, "set the rp to work on") f.StringVar(&chunktimestr, "chunk", chunktimestr, "set RW chuck periods as in the data-chuck-duration config param") @@ -289,12 +293,12 @@ func main() { agent.HAMonitorStart(master, slave) webui.WebServer("", httpPort, &agent.MainConfig.HTTP, agent.MainConfig.General.InstanceID) case "copy": - agent.Copy(master, slave, actiondb, newdb, newrp, starttime, endtime, fulltime) + agent.Copy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime) case "move": case "replicaschema": - agent.ReplSch(master, slave, actiondb, newdb, newrp) + agent.ReplSch(master, slave, actiondb, newdb, actionrp, newrp, actionmeas) case "fullcopy": - agent.SchCopy(master, slave, actiondb, newdb, newrp, starttime, endtime, fulltime) + agent.SchCopy(master, slave, actiondb, newdb, actionrp, newrp, actionmeas, starttime, endtime, fulltime) default: fmt.Printf("Unknown action: %s", action) }