Skip to content

Commit

Permalink
Merge pull request #5 from slawr/rebase-iotdb-config-support
Browse files Browse the repository at this point in the history
Apache IoTDB: add runtime configuration file support for connector
  • Loading branch information
UlfBj authored Mar 14, 2024
2 parents 7866770 + 43fb9af commit 4f69b40
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
8 changes: 8 additions & 0 deletions server/vissv2server/iotdb-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"host": "iotdb-service",
"port": "6667",
"username": "root",
"password": "root",
"queryPrefixPath": "root.test2.dev1",
"queryTimeout(ms)": 5000
}
71 changes: 56 additions & 15 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,33 @@ var historySupport bool

// Apache IoTDB
var IoTDBsession client.Session
var IoTDbPrefixPath string = "root.test2.dev1" // DB prefix used for get/set
var IoTDbTimeout int64 = 3000
var IoTDBconfig = &client.Config{
// Host: "127.0.0.1",
Host: "iotdb-service",
var IoTDBClientConfig = &client.Config {
Host: "127.0.0.1",
Port: "6667",
UserName: "root",
Password: "root",
}

type IoTDBConfiguration struct {
Host string `json:"host"`
Port string `json:"port"`
UserName string `json:"username"`
Password string `json:"password"`
PrefixPath string `json:"queryPrefixPath"`
Timeout int64 `json:"queryTimeout(ms)"`
}

// Default IoTDB connector configuration
var IoTDBConfig = IoTDBConfiguration {
"127.0.0.1",
"6667",
"root",
"root",
"root.test2.dev1",
5000,
}


var dummyValue int // dummy value returned when DB configured to none. Counts from 0 to 999, wrap around, updated every 47 msec

func initDataServer(serviceMgrChan chan string, clientChannel chan string, backendChannel chan string) {
Expand Down Expand Up @@ -495,12 +512,12 @@ func getVehicleData(path string) string { // returns {"value":"Y", "ts":"Z"}
case "apache-iotdb":
var (
// Back-quote the VSS node for the DB query, e.g. `Vehicle.CurrentLocation.Longitude`
selectLastSQL = fmt.Sprintf("select last `%v` from %v", path, IoTDbPrefixPath)
value = ""
ts = ""
selectLastSQL = fmt.Sprintf("select last `%v` from %v", path, IoTDBConfig.PrefixPath)
value = ""
ts = ""
)
// utils.Info.Printf("IoTDB: query using: %v", selectLastSQL)
sessionDataSet, err := IoTDBsession.ExecuteQueryStatement(selectLastSQL, &IoTDbTimeout)
// utils.Info.Printf("IoTDB: query using: %v", selectLastSQL)
sessionDataSet, err := IoTDBsession.ExecuteQueryStatement(selectLastSQL, &IoTDBConfig.Timeout)
if err == nil {
var success bool
success, err = sessionDataSet.Next()
Expand Down Expand Up @@ -573,8 +590,8 @@ func setVehicleData(path string, value string) string {
IoTDBts := time.Now().UTC().UnixNano() / 1000000

// IoTDB will automatically convert the value string to the native data type in the timeseries schema for basic types
// utils.Info.Printf("IoTDB: DB insert with prefixPath: %v vssKey: %v, vssValue: %v, ts: %v", IoTDbPrefixPath, vssKey, vssValue, IoTDBts)
if status, err := IoTDBsession.InsertStringRecord(IoTDbPrefixPath, vssKey, vssValue, IoTDBts); err != nil {
// utils.Info.Printf("IoTDB: DB insert with prefixPath: %v vssKey: %v, vssValue: %v, ts: %v", IoTDBConfig.PrefixPath, vssKey, vssValue, IoTDBts)
if status, err := IoTDBsession.InsertStringRecord(IoTDBConfig.PrefixPath, vssKey, vssValue, IoTDBts); err != nil {
utils.Error.Printf("IoTDB: DB insert using InsertStringRecord failed with: %v", err)
return ""
} else {
Expand Down Expand Up @@ -983,10 +1000,34 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
}
utils.Info.Printf("Redis state storage initialised.")
case "apache-iotdb":
utils.Info.Printf("IoTDB: creating new session with host:%v port:%v user:%v pass:%v", IoTDBconfig.Host, IoTDBconfig.Port, IoTDBconfig.UserName, IoTDBconfig.Password)
IoTDBsession = client.NewSession(IoTDBconfig)
// Read configuration from file if present else use defaults
IoTDBConfigFilename := "iotdb-config.json"
utils.Info.Printf("IoTDB: Default configuration before config file read = %+v", IoTDBConfig)
data, err := os.ReadFile(IoTDBConfigFilename)
if err != nil {
utils.Error.Printf("IoTDB: Failed to read configuration from %v with error = %+v", IoTDBConfigFilename, err)
} else {
var IoTDBJSONConfig IoTDBConfiguration
err = json.Unmarshal(data, &IoTDBJSONConfig)
if err != nil {
utils.Error.Printf("IoTDB: Failed to unmarshal the JSON config data from %v with error = %+v", IoTDBConfigFilename, err)
} else {
utils.Info.Printf("IoTDB: Configuration read from config file %v = %+v", IoTDBConfigFilename, IoTDBJSONConfig)
// Success. Copy config read from the file.
IoTDBConfig = IoTDBJSONConfig
}
}

IoTDBClientConfig.Host = IoTDBConfig.Host
IoTDBClientConfig.Port = IoTDBConfig.Port
IoTDBClientConfig.UserName = IoTDBConfig.UserName
IoTDBClientConfig.Password = IoTDBConfig.Password

// Create new client session with IoTDB server
utils.Info.Printf("IoTDB: Creating new session with client config = %+v", *IoTDBClientConfig)
IoTDBsession = client.NewSession(IoTDBClientConfig)
if err := IoTDBsession.Open(false, 0); err != nil {
utils.Error.Printf("IoTDB: Failed to open session with error=%s", err)
utils.Error.Printf("IoTDB: Failed to open server session with error=%s", err)
os.Exit(1)
}
defer IoTDBsession.Close()
Expand Down

0 comments on commit 4f69b40

Please sign in to comment.