From df732a3283bcdc8555bb7f211bfb3710f90ee14f Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Thu, 2 Jan 2025 14:10:05 +0100 Subject: [PATCH 1/3] Core-servicemgr thread comm refactoring Signed-off-by: Ulf Bjorkengren --- server/vissv2server/serviceMgr/serviceMgr.go | 67 ++++++++++---------- server/vissv2server/vissv2server.go | 30 +++------ 2 files changed, 43 insertions(+), 54 deletions(-) diff --git a/server/vissv2server/serviceMgr/serviceMgr.go b/server/vissv2server/serviceMgr/serviceMgr.go index af473d50..b8d15647 100644 --- a/server/vissv2server/serviceMgr/serviceMgr.go +++ b/server/vissv2server/serviceMgr/serviceMgr.go @@ -126,16 +126,16 @@ var IoTDBConfig = IoTDBConfiguration{ var dummyValue int // dummy value returned when DB configured to none. Counts from 0 to 999, wrap around, updated every 47 msec -func initDataServer(serviceMgrChan chan string, clientChannel chan string, backendChannel chan string) { +func initDataServer(serviceMgrChan chan map[string]interface{}, clientChannel chan map[string]interface{}, backendChannel chan map[string]interface{}) { for { select { case request := <-serviceMgrChan: - utils.Info.Printf("Service mgr request: %s", request) +// utils.Info.Printf("Service mgr request: %s", request) clientChannel <- request // forward to mgr hub, - if strings.Contains(request, "internal-killsubscriptions") == false { // no response on kill sub + if request["action"] != "internal-killsubscriptions" { // no response on kill sub response := <-clientChannel // and wait for response - utils.Info.Printf("Service mgr response: %s", response) +// utils.Info.Printf("Service mgr response: %s", response) serviceMgrChan <- response } case notification := <-backendChannel: // notification @@ -395,10 +395,6 @@ func compareValues(logicOp string, latestValue string, currentValue string, diff return false } -func addPackage(incompleteMessage string, packName string, packValue string) string { - return incompleteMessage[:len(incompleteMessage)-1] + ", \"" + packName + "\":" + packValue + "}" -} - func deactivateSubscription(subscriptionList []SubscriptionState, subscriptionId string) (int, []SubscriptionState) { id, _ := strconv.Atoi(subscriptionId) index := getSubcriptionStateIndex(id, subscriptionList) @@ -1224,7 +1220,8 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) { } } -func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) { +//func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) { +func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) { stateDbType = stateStorageType historySupport = histSupport @@ -1330,8 +1327,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri utils.Error.Printf("Unknown state storage type = %s", stateDbType) } - dataChan := make(chan string) - backendChan := make(chan string) + dataChan := make(chan map[string]interface{}) + backendChan := make(chan map[string]interface{}) subscriptionChan := make(chan int) historyAccessChannel = make(chan string) initClResources() @@ -1360,12 +1357,10 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri for { select { - case request := <-dataChan: // request from server core - utils.Info.Printf("Service manager: Request from Server core:%s\n", request) + case requestMap := <-dataChan: // request from server core +// utils.Info.Printf("Service manager: Request from Server core:%s\n", request) // TODO: interact with underlying subsystem to get the value - var requestMap = make(map[string]interface{}) var responseMap = make(map[string]interface{}) - utils.MapRequest(request, &requestMap) responseMap["RouterId"] = requestMap["RouterId"] responseMap["action"] = requestMap["action"] responseMap["requestId"] = requestMap["requestId"] @@ -1377,23 +1372,23 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri case "set": if strings.Contains(requestMap["path"].(string), "[") == true { utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } ts := setVehicleData(requestMap["path"].(string), requestMap["value"].(string)) if len(ts) == 0 { utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } responseMap["ts"] = ts - dataChan <- utils.FinalizeMessage(responseMap) + dataChan <- responseMap case "get": pathArray := unpackPaths(requestMap["path"].(string)) if pathArray == nil { utils.Error.Printf("Unmarshal of path array failed.") utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } var filterList []utils.FilterObject @@ -1402,7 +1397,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri if len(filterList) == 0 { utils.Error.Printf("Request filter malformed.") utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } } @@ -1410,10 +1405,11 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri if len(dataPack) == 0 { utils.Info.Printf("No historic data available") utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } - dataChan <- addPackage(utils.FinalizeMessage(responseMap), "data", dataPack) + responseMap["data"] = dataPack + dataChan <- responseMap case "subscribe": var subscriptionState SubscriptionState subscriptionState.SubscriptionId = subscriptionId @@ -1421,13 +1417,13 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri subscriptionState.Path = unpackPaths(requestMap["path"].(string)) if requestMap["filter"] == nil || requestMap["filter"] == "" { utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap break } utils.UnpackFilter(requestMap["filter"], &(subscriptionState.FilterList)) if len(subscriptionState.FilterList) == 0 { utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap } if requestMap["gatingId"] != nil { subscriptionState.GatingId = requestMap["gatingId"].(string) @@ -1441,7 +1437,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri toFeeder <- createFeederNotifyMessage(variant, subscriptionState.Path, subscriptionId) } subscriptionId++ // not to be incremented elsewhere - dataChan <- utils.FinalizeMessage(responseMap) + dataChan <- responseMap case "unsubscribe": if requestMap["subscriptionId"] != nil { status := -1 @@ -1450,15 +1446,15 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri status, subscriptionList = deactivateSubscription(subscriptionList, subscriptId) if status != -1 { responseMap["subscriptionId"] = subscriptId - dataChan <- utils.FinalizeMessage(responseMap) - toFeeder <- request + dataChan <- responseMap + toFeeder <- utils.FinalizeMessage(requestMap) break } requestMap["subscriptionId"] = subscriptId } } utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap case "internal-killsubscriptions": isRemoved := true for isRemoved == true { @@ -1473,12 +1469,12 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri requestMap["requestId"] = nil requestMap["subscriptionId"] = subscriptionId utils.SetErrorResponse(requestMap, errorResponseMap, 2, "Token expired or consent cancelled.") - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap _, subscriptionList = scanAndRemoveListItem(subscriptionList, routerId) } default: utils.SetErrorResponse(requestMap, errorResponseMap, 1, "Unknown action") //invalid_data - dataChan <- utils.FinalizeMessage(errorResponseMap) + dataChan <- errorResponseMap } // switch case <-dummyTicker.C: dummyValue++ @@ -1492,7 +1488,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionState.Path, nil)) + subscriptionMap["data"] = getDataPack(subscriptionState.Path, nil) + backendChan <- subscriptionMap case clPack := <-CLChannel: // curve logging notification index := getSubcriptionStateIndex(clPack.SubscriptionId, subscriptionList) if index == -1 { @@ -1510,7 +1507,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionList[index].SubscriptionId) subscriptionMap["RouterId"] = subscriptionList[index].RouterId - backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", clPack.DataPack) + subscriptionMap["data"] = clPack.DataPack + backendChan <- subscriptionMap case <-subscriptTicker.C: if feederNotification == false { // feeder does not issue notifications subscriptionList = checkRCFilterAndIssueMessages("", subscriptionList, backendChan) @@ -1526,7 +1524,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri utils.Info.Printf("Service manager exit") } -func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan string) []SubscriptionState { +func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan map[string]interface{}) []SubscriptionState { // check if range or change notification triggered for i := range subscriptionList { if len(triggeredPath) == 0 || triggeredPath == subscriptionList[i].Path[0] { @@ -1539,7 +1537,8 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionList[i].Path, nil)) + subscriptionMap["data"] = getDataPack(subscriptionList[i].Path, nil) + backendChan <- subscriptionMap } } } diff --git a/server/vissv2server/vissv2server.go b/server/vissv2server/vissv2server.go index 22c99a56..12f3ff12 100644 --- a/server/vissv2server/vissv2server.go +++ b/server/vissv2server/vissv2server.go @@ -64,7 +64,7 @@ var transportMgrChannel []chan string var transportDataChan []chan string var backendChan []chan string -var serviceMgrChannel []chan string +var serviceMgrChannel []chan map[string]interface{} var atsChannel []chan string @@ -89,30 +89,20 @@ func extractMgrId(routerId string) int { // "RouterId" : "mgrId?clientId" return mgrId } -func getRouterId(response string) string { // "RouterId" : "mgrId?clientId", - afterRouterIdKey := strings.Index(response, "RouterId") - if afterRouterIdKey == -1 { - return "" - } - afterRouterIdKey += 8 + 1 // points to after quote - routerIdValStart := utils.NextQuoteMark([]byte(response), afterRouterIdKey) + 1 - routerIdValStop := utils.NextQuoteMark([]byte(response), routerIdValStart) - utils.Info.Printf("getRouterId: %s", response[routerIdValStart:routerIdValStop]) - return response[routerIdValStart:routerIdValStop] -} - -func serviceDataSession(serviceMgrChannel chan string, serviceDataChannel chan string, backendChannel []chan string) { +func serviceDataSession(serviceMgrChannel chan map[string]interface{}, serviceDataChannel chan string, backendChannel []chan string) { for { select { case response := <-serviceMgrChannel: - utils.Info.Printf("Server core: Response from service mgr:%s", string(response)) - mgrIndex := extractMgrId(getRouterId(string(response))) +// utils.Info.Printf("Server core: Response from service mgr:%s", string(response)) + mgrIndex := extractMgrId(response["RouterId"].(string)) utils.Info.Printf("mgrIndex=%d", mgrIndex) - backendChannel[mgrIndex] <- string(response) + backendChannel[mgrIndex] <- utils.FinalizeMessage(response) case request := <-serviceDataChannel: utils.Info.Printf("Server core: Request to service:%s", request) - serviceMgrChannel <- request +requestMap := make(map[string]interface{}) +utils.MapRequest(request, &requestMap) + serviceMgrChannel <- requestMap } } } @@ -798,8 +788,8 @@ func getFileDescriptorData(value interface{}) (string, string, string) { // {"na func initChannels() { ftChannel = make(chan utils.FileTransferCache) - serviceMgrChannel = make([]chan string, 1) - serviceMgrChannel[0] = make(chan string) + serviceMgrChannel = make([]chan map[string]interface{}, 1) + serviceMgrChannel[0] = make(chan map[string]interface{}) serviceDataChan = make([]chan string, 1) serviceDataChan[0] = make(chan string) transportMgrChannel = make([]chan string, NUMOFTRANSPORTMGRS) From f17245a9a466b641a5eeca27d7f73e1b8642f57f Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Fri, 3 Jan 2025 14:06:41 +0100 Subject: [PATCH 2/3] Thread comm refactoring 2 Signed-off-by: Ulf Bjorkengren --- client/client-1.0/requests.json | 6 +- server/vissv2server/serviceMgr/serviceMgr.go | 14 +- server/vissv2server/vissv2server.go | 205 +++++++++++-------- utils/common.go | 30 --- 4 files changed, 130 insertions(+), 125 deletions(-) diff --git a/client/client-1.0/requests.json b/client/client-1.0/requests.json index 7c132104..1ba80e46 100755 --- a/client/client-1.0/requests.json +++ b/client/client-1.0/requests.json @@ -1,6 +1,6 @@ -{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","requestId":"232"}, +{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"}, {"action":"get","path":"Incorrect/Path","requestId":"1957"}, {"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, {"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, -{"action":"set", "path":"Vehicle/Cabin/Door/Row1/Right/IsOpen", "value":"999", "requestId":"245"}, -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} +{"action":"set", "path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen", "value":"true", "requestId":"245"}, +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} diff --git a/server/vissv2server/serviceMgr/serviceMgr.go b/server/vissv2server/serviceMgr/serviceMgr.go index b8d15647..4800646a 100644 --- a/server/vissv2server/serviceMgr/serviceMgr.go +++ b/server/vissv2server/serviceMgr/serviceMgr.go @@ -1408,7 +1408,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, state dataChan <- errorResponseMap break } - responseMap["data"] = dataPack + responseMap["data"] = string2Map(dataPack)["s2m"] dataChan <- responseMap case "subscribe": var subscriptionState SubscriptionState @@ -1488,7 +1488,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, state subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - subscriptionMap["data"] = getDataPack(subscriptionState.Path, nil) + subscriptionMap["data"] = string2Map(getDataPack(subscriptionState.Path, nil))["s2m"] backendChan <- subscriptionMap case clPack := <-CLChannel: // curve logging notification index := getSubcriptionStateIndex(clPack.SubscriptionId, subscriptionList) @@ -1507,7 +1507,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, state subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionList[index].SubscriptionId) subscriptionMap["RouterId"] = subscriptionList[index].RouterId - subscriptionMap["data"] = clPack.DataPack + subscriptionMap["data"] = string2Map(clPack.DataPack)["s2m"] backendChan <- subscriptionMap case <-subscriptTicker.C: if feederNotification == false { // feeder does not issue notifications @@ -1537,7 +1537,7 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - subscriptionMap["data"] = getDataPack(subscriptionList[i].Path, nil) + subscriptionMap["data"] = string2Map(getDataPack(subscriptionList[i].Path, nil))["s2m"] backendChan <- subscriptionMap } } @@ -1545,6 +1545,12 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs return subscriptionList } +func string2Map(msg string) map[string]interface{} { + var msgMap map[string]interface{} + utils.MapRequest(`{"s2m":`+msg+"}", &msgMap) + return msgMap +} + func decodeFeederMessage(feederMessage string, feederNotification bool) (string, bool) { if len(feederMessage) == 0 { return "", feederNotification diff --git a/server/vissv2server/vissv2server.go b/server/vissv2server/vissv2server.go index 12f3ff12..460d3ad6 100644 --- a/server/vissv2server/vissv2server.go +++ b/server/vissv2server/vissv2server.go @@ -61,8 +61,9 @@ var serverComponents []string = []string{ */ const NUMOFTRANSPORTMGRS = 4 // order assigned to channels: HTTP, WS, MQTT, gRPC var transportMgrChannel []chan string -var transportDataChan []chan string -var backendChan []chan string +var transportDataChan []chan map[string]interface{} +//var transportDataChan []chan string +var backendChan []chan map[string]interface{} var serviceMgrChannel []chan map[string]interface{} @@ -71,7 +72,7 @@ var atsChannel []chan string var serviceDataPortNum int = 8200 // port number interval [8200-] // add element if support for new service manager is added -var serviceDataChan []chan string +var serviceDataChan []chan map[string]interface{} var ftChannel chan utils.FileTransferCache @@ -89,7 +90,7 @@ func extractMgrId(routerId string) int { // "RouterId" : "mgrId?clientId" return mgrId } -func serviceDataSession(serviceMgrChannel chan map[string]interface{}, serviceDataChannel chan string, backendChannel []chan string) { +func serviceDataSession(serviceMgrChannel chan map[string]interface{}, serviceDataChannel chan map[string]interface{}, backendChannel []chan map[string]interface{}) { for { select { @@ -97,26 +98,28 @@ func serviceDataSession(serviceMgrChannel chan map[string]interface{}, serviceDa // utils.Info.Printf("Server core: Response from service mgr:%s", string(response)) mgrIndex := extractMgrId(response["RouterId"].(string)) utils.Info.Printf("mgrIndex=%d", mgrIndex) - backendChannel[mgrIndex] <- utils.FinalizeMessage(response) + backendChannel[mgrIndex] <- response case request := <-serviceDataChannel: - utils.Info.Printf("Server core: Request to service:%s", request) -requestMap := make(map[string]interface{}) -utils.MapRequest(request, &requestMap) - serviceMgrChannel <- requestMap +// utils.Info.Printf("Server core: Request to service:%s", request) + serviceMgrChannel <- request } } } -func transportDataSession(transportMgrChannel chan string, transportDataChannel chan string, backendChannel chan string) { +func transportDataSession(transportMgrChannel chan string, transportDataChannel chan map[string]interface{}, backendChannel chan map[string]interface{}) { for { select { case msg := <-transportMgrChannel: utils.Info.Printf("request: %s", msg) - transportDataChannel <- msg // send request to server hub +var msgMap map[string]interface{} +utils.MapRequest(msg, &msgMap) + transportDataChannel <- msgMap // send request to server hub +// transportDataChannel <- msg // send request to server hub case message := <-backendChannel: - utils.Info.Printf("Transport mgr server: message= %s", message) - transportMgrChannel <- message +// utils.Info.Printf("Transport mgr server: message= %s", message) + transportMgrChannel <- utils.FinalizeMessage(message) +// transportMgrChannel <- message } } } @@ -403,8 +406,8 @@ func getTokenContext(reqMap map[string]interface{}) string { return "" } -func validRequest(request string, action string) bool { - switch action { +func validRequest(request map[string]interface{}) bool { + switch request["action"].(string) { case "get": return isValidGetParams(request) case "set": @@ -421,104 +424,115 @@ func validRequest(request string, action string) bool { return false } -func isValidGetParams(request string) bool { - if strings.Contains(request, "path") == false { +func isValidGetParams(request map[string]interface{}) bool { + if request["path"] == nil { return false } - if strings.Contains(request, "filter") == true { + if request["filter"] != nil { return isValidGetFilter(request) } return true } -func isValidGetFilter(request string) bool { // paths, history, metadata supported - if strings.Contains(request, "paths") == true { - if strings.Contains(request, "parameter") == true { +func isValidGetFilter(request map[string]interface{}) bool { // paths, history, metadata supported + return true // needs to be fixed +// if strings.Contains(request, "paths") == true { + if request["paths"] != nil { +// if strings.Contains(request, "parameter") == true { + if request["parameter"] != nil { return true } } - if strings.Contains(request, "history") == true { - if strings.Contains(request, "parameter") == true { +// if strings.Contains(request, "history") == true { + if request["history"] != nil { +// if strings.Contains(request, "parameter") == true { + if request["parameter"] != nil { return true } } - if strings.Contains(request, "metadata") == true { - if strings.Contains(request, "parameter") == true { +// if strings.Contains(request, "metadata") == true { + if request["metadata"] != nil { +// if strings.Contains(request, "parameter") == true { + if request["parameter"] != nil { return true } } return false } -func isValidSetParams(request string) bool { - return strings.Contains(request, "path") && strings.Contains(request, "value") +func isValidSetParams(request map[string]interface{}) bool { + return request["path"] != nil && request["value"] != nil } -func isValidSubscribeParams(request string) bool { - if strings.Contains(request, "path") == false { +func isValidSubscribeParams(request map[string]interface{}) bool { + if request["path"] == nil { return false } - if strings.Contains(request, "filter") == true { - return isValidSubscribeFilter(request) + if request["filter"] != nil { + return true +// return isValidSubscribeFilter(request) } return true } -func isValidSubscribeFilter(request string) bool { // paths, history, timebased, range, change, curvelog, metadata supported - if isValidGetFilter(request) == true { - return true +func isValidSubscribeFilter(request map[string]interface{}) bool { // paths, timebased, range, change, curvelog supported + return true // needs to be fixed +// if strings.Contains(request, "paths") == true { + if request["paths"] != nil { +// if strings.Contains(request, "parameter") == true { + if request["parameter"] != nil { + return true + } } - if strings.Contains(request, "timebased") == true { - if strings.Contains(request, "parameter") == true && strings.Contains(request, "period") == true { +// if strings.Contains(request, "timebased") == true { + if request["timebased"] != nil { +// if strings.Contains(request, "parameter") == true && strings.Contains(request, "period") == true { + if request["parameter"] != nil && request["period"] != nil { return true } } - if strings.Contains(request, "range") == true { - if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true && - strings.Contains(request, "boundary") == true { +// if strings.Contains(request, "range") == true { + if request["range"] != nil { +// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true && +// strings.Contains(request, "boundary") == true { + if request["parameter"] != nil && request["logic-op"] != nil && request["boundary"] != nil { return true } } - if strings.Contains(request, "change") == true { - if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true && - strings.Contains(request, "diff") == true { +// if strings.Contains(request, "change") == true { + if request["change"] != nil { +// if strings.Contains(request, "parameter") == true && strings.Contains(request, "logic-op") == true && +// strings.Contains(request, "diff") == true { + if request["parameter"] != nil && request["logic-op"] != nil && request["diff"] != nil { return true } } - if strings.Contains(request, "curvelog") == true { - if strings.Contains(request, "parameter") == true && strings.Contains(request, "maxerr") == true && - strings.Contains(request, "bufsize") == true { +// if strings.Contains(request, "curvelog") == true { + if request["curvelog"] != nil { +// if strings.Contains(request, "parameter") == true && strings.Contains(request, "maxerr") == true && +// strings.Contains(request, "bufsize") == true { + if request["parameter"] != nil && request["maxerr"] != nil && request["bufsize"] != nil { return true } } return false } -func isValidUnsubscribeParams(request string) bool { - return strings.Contains(request, "subscriptionId") +func isValidUnsubscribeParams(request map[string]interface{}) bool { + return request["subscriptionId"] != nil } -// Receives a request (json containing path, action, token, routerId....) and calls -// the appropriate function to handle the request -func serveRequest(request string, tDChanIndex int, sDChanIndex int) { - utils.Info.Printf("serveRequest():request=%s", request) - var requestMap = make(map[string]interface{}) - if utils.MapRequest(request, &requestMap) != 0 { - utils.Error.Printf("serveRequest():invalid JSON format=%s", request) - utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) - return - } - if requestMap["action"] == nil || validRequest(request, requestMap["action"].(string)) == false { +func serveRequest(requestMap map[string]interface{}, tDChanIndex int, sDChanIndex int) { + if requestMap["action"] == nil || validRequest(requestMap) == false { utils.Error.Printf("serveRequest():invalid action params=%s", requestMap["action"]) utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } if requestMap["path"] != nil && strings.Contains(requestMap["path"].(string), "*") == true { utils.Error.Printf("serveRequest():path contained wildcard=%s", requestMap["path"]) utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } if requestMap["path"] != nil { @@ -527,11 +541,11 @@ func serveRequest(request string, tDChanIndex int, sDChanIndex int) { if requestMap["action"] == "set" && requestMap["filter"] != nil { utils.Error.Printf("serveRequest():Set request combined with filtering.") utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } if requestMap["action"] == "unsubscribe" { - serviceDataChan[sDChanIndex] <- request + serviceDataChan[sDChanIndex] <- requestMap return } issueServiceRequest(requestMap, tDChanIndex, sDChanIndex) @@ -539,20 +553,20 @@ func serveRequest(request string, tDChanIndex int, sDChanIndex int) { func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDChanIndex int) { if requestMap["action"] == "internal-killsubscriptions" || requestMap["action"] == "internal-cancelsubscription" { - serviceDataChan[sDChanIndex] <- utils.FinalizeMessage(requestMap) // internal message + serviceDataChan[sDChanIndex] <- requestMap // internal message return } if requestMap["path"] == nil { utils.Error.Printf("Unmarshal filter path array failed.") utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } rootPath := requestMap["path"].(string) VSSTreeRoot := utils.SetRootNodePointer(rootPath) if VSSTreeRoot == nil { utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } var searchPath []string @@ -571,7 +585,7 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC if err != nil { utils.Error.Printf("Unmarshal filter path array failed.") utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } for i := 0; i < len(searchPath); i++ { @@ -599,13 +613,14 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC delete(requestMap, "path") delete(requestMap, "filter") requestMap["ts"] = utils.GetRfcTime() - backendChan[tDChanIndex] <- utils.AddKeyValue(utils.FinalizeMessage(requestMap), "metadata", metadata) + requestMap["metadata"] = metadata + backendChan[tDChanIndex] <- requestMap return } } utils.Error.Printf("Metadata error") utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } } @@ -634,7 +649,7 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC } if totalMatches == 0 { utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } nodeType := utils.VSSgetType(searchData[0].NodeHandle) @@ -645,7 +660,7 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC } if requestMap["action"] == "set" && nodeType != utils.ACTUATOR { utils.SetErrorResponse(requestMap, errorResponseMap, 1, "Forbidden to write to read-only resource.") //invalid_data - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } paths = paths[:len(paths)-2] @@ -679,12 +694,12 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC } if errorCode != 0 { setTokenErrorResponse(requestMap, errorCode) - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } default: // should not be possible... utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable - backendChan[tDChanIndex] <- utils.FinalizeMessage(errorResponseMap) + backendChan[tDChanIndex] <- errorResponseMap return } if totalMatches == 1 { @@ -697,12 +712,13 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC if gatingId != "" { requestMap["gatingId"] = gatingId } - serviceDataChan[sDChanIndex] <- utils.FinalizeMessage(requestMap) + serviceDataChan[sDChanIndex] <- requestMap } -func initiateFileTransfer(requestMap map[string]interface{}, nodeType utils.NodeTypes_t, path string) string { +func initiateFileTransfer(requestMap map[string]interface{}, nodeType utils.NodeTypes_t, path string) map[string]interface{} { utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", requestMap["action"], nodeType) var ftInitData utils.FileTransferCache + var responseMap map[string]interface{} if requestMap["action"] == "set" && nodeType == utils.ACTUATOR { // download var uidString string ftInitData.UploadTransfer = false @@ -713,10 +729,14 @@ utils.Info.Printf("initiateFileTransfer: requestMap[action]=%s, nodeType=%d", re ftChannel <- ftInitData ftInitData = <- ftChannel if ftInitData.Status == 0 { - return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "set", "ts": "` + utils.GetRfcTime() + `"}` + responseMap["RouterId"] = requestMap["RouterId"].(string) + responseMap["action"] = "set" + responseMap["ts"] = utils.GetRfcTime() + return responseMap +// return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "set", "ts": "` + utils.GetRfcTime() + `"}` } else { utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable - return utils.FinalizeMessage(errorResponseMap) + return errorResponseMap } } else if requestMap["action"] == "get" && nodeType == utils.SENSOR { //upload @@ -730,13 +750,19 @@ utils.Info.Printf("initiateFileTransfer: upload!!!") ftInitData.Uid = [utils.UIDLEN]byte(uidByte) ftChannel <- ftInitData _ = <- ftChannel - return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "get", "path":"` + path + + responseMap["RouterId"] = requestMap["RouterId"].(string) + responseMap["action"] = "get" + responseMap["path"] = path + responseMap["value"] = `{"name": "` + ftInitData.Name + `", "hash":"` + ftInitData.Hash + `","uid":"` + hex.EncodeToString(ftInitData.Uid[:]) + `"}` + responseMap["ts"] = utils.GetRfcTime() + return responseMap +/* return `{"RouterId": "` + requestMap["RouterId"].(string) + `"action": "get", "path":"` + path + `", "value":{"name": "` + ftInitData.Name + `", "hash":"` + ftInitData.Hash + `","uid":"` + hex.EncodeToString(ftInitData.Uid[:]) + `"}, ` + - `"ts": "` + utils.GetRfcTime() + `"}` + `"ts": "` + utils.GetRfcTime() + `"}`*/ } } utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data - return utils.FinalizeMessage(errorResponseMap) + return errorResponseMap } func calculateHash(fileName string) string { @@ -790,15 +816,15 @@ func initChannels() { ftChannel = make(chan utils.FileTransferCache) serviceMgrChannel = make([]chan map[string]interface{}, 1) serviceMgrChannel[0] = make(chan map[string]interface{}) - serviceDataChan = make([]chan string, 1) - serviceDataChan[0] = make(chan string) + serviceDataChan = make([]chan map[string]interface{}, 1) + serviceDataChan[0] = make(chan map[string]interface{}) transportMgrChannel = make([]chan string, NUMOFTRANSPORTMGRS) - transportDataChan = make([]chan string, NUMOFTRANSPORTMGRS) - backendChan = make([]chan string, NUMOFTRANSPORTMGRS) + transportDataChan = make([]chan map[string]interface{}, NUMOFTRANSPORTMGRS) + backendChan = make([]chan map[string]interface{}, NUMOFTRANSPORTMGRS) for i := 0; i < NUMOFTRANSPORTMGRS; i++ { - transportMgrChannel[i] = make(chan string) - transportDataChan[i] = make(chan string) - backendChan[i] = make(chan string) + transportMgrChannel[i] = make(chan string) + transportDataChan[i] = make(chan map[string]interface{}) + backendChan[i] = make(chan map[string]interface{}) } atsChannel = make([]chan string, 2) atsChannel[0] = make(chan string) // access token verification @@ -909,7 +935,10 @@ func main() { case request := <-transportDataChan[3]: // request from gRPC mgr serveRequest(request, 3, 0) case gatingId := <-atsChannel[1]: - request := `{"action": "internal-cancelsubscription", "gatingId":"` + gatingId + `"}` +// request := `{"action": "internal-cancelsubscription", "gatingId":"` + gatingId + `"}` + var request map[string]interface{} + request["action"] = "internal-cancelsubscription" + request["gatingId"] = gatingId serveRequest(request, 0, 0) // case request := <- transportDataChan[X]: // implement when there is a Xth transport protocol mgr } diff --git a/utils/common.go b/utils/common.go index 1879cdbc..fdcf0aa3 100644 --- a/utils/common.go +++ b/utils/common.go @@ -230,31 +230,6 @@ func ExtractFromToken(token string, claim string) string { // TODO remove white return "" } -/*func SetErrorResponse(reqMap map[string]interface{}, errRespMap map[string]interface{}, number string, reason string, message string) { - if reqMap["RouterId"] != nil { - errRespMap["RouterId"] = reqMap["RouterId"] - } - if reqMap["action"] != nil { - errRespMap["action"] = reqMap["action"] - } - if reqMap["requestId"] != nil { - errRespMap["requestId"] = reqMap["requestId"] - } else { - delete(errRespMap, "requestId") - } - if reqMap["subscriptionId"] != nil { - errRespMap["subscriptionId"] = reqMap["subscriptionId"] - } - errMap := map[string]interface{}{ - "number": number, - "reason": reason, - "message": message, - } - errRespMap["error"] = errMap - errRespMap["ts"] = GetRfcTime() -}*/ - -// func SetErrorResponse(reqMap map[string]interface{}, errRespMap map[string]interface{}, number string, reason string, message string) { func SetErrorResponse(reqMap map[string]interface{}, errRespMap map[string]interface{}, errorListIndex int, altErrorMessage string) { if reqMap["RouterId"] != nil { errRespMap["RouterId"] = reqMap["RouterId"] @@ -279,11 +254,6 @@ func SetErrorResponse(reqMap map[string]interface{}, errRespMap map[string]inter "reason": ErrorInfoList[errorListIndex].Reason, "message": errorMessage, } - /* errMap := map[string]interface{}{ - "number": number, - "reason": reason, - "message": message, - }*/ errRespMap["error"] = errMap errRespMap["ts"] = GetRfcTime() } From b207aea9dc4f059de9ffc97b6db11c4c18b88dc7 Mon Sep 17 00:00:00 2001 From: Ulf Bjorkengren Date: Mon, 6 Jan 2025 09:27:59 +0100 Subject: [PATCH 3/3] Thead comm refactoring done Signed-off-by: Ulf Bjorkengren --- client/client-1.0/requests.json | 4 +- server/vissv2server/serviceMgr/serviceMgr.go | 40 ++++++++++++++++---- server/vissv2server/vissv2server.go | 12 +++--- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/client/client-1.0/requests.json b/client/client-1.0/requests.json index 1ba80e46..b5f357c6 100755 --- a/client/client-1.0/requests.json +++ b/client/client-1.0/requests.json @@ -1,6 +1,6 @@ {"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"}, {"action":"get","path":"Incorrect/Path","requestId":"1957"}, {"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"}, -{"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"}, +{"action":"get","path":"Vehicle/ADAS","filter":{"variant":"paths","parameter":["ABS/*","CruiseControl/IsError"]},"requestId":"237"}, {"action":"set", "path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen", "value":"true", "requestId":"245"}, -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3"}},"requestId":"246"}]} diff --git a/server/vissv2server/serviceMgr/serviceMgr.go b/server/vissv2server/serviceMgr/serviceMgr.go index 4800646a..eb23fe92 100644 --- a/server/vissv2server/serviceMgr/serviceMgr.go +++ b/server/vissv2server/serviceMgr/serviceMgr.go @@ -77,6 +77,7 @@ type FeederPathElem struct { Reference int } var feederPathList []FeederPathElem +var feederConnected bool //var feederConn net.Conn //var hostIp string @@ -617,6 +618,9 @@ func setVehicleData(path string, value string) string { utils.Error.Printf("setVehicleData:Write failed, err = %s", err) return "" }*/ + if !feederConnected { + return "" + } message := `{"action": "set", "data": {"path":"` + path + `", "dp":{"value":"` + value + `", "ts":"` + ts + `"}}}` toFeeder <- message return ts @@ -917,6 +921,26 @@ func getDataPack(pathArray []string, filterList []utils.FilterObject) string { return dataPack } +func getDataPackMap(pathArray []string) map[string]interface{} { + dataPack := make(map[string]interface{}, 0) + if len(pathArray) > 1 { + dataPackElement := make([]interface{}, len(pathArray)) + for i := 0; i < len(pathArray); i++ { + dataPackElement[i] = map[string]interface{}{ + "path": pathArray[i], + "dp": string2Map(getVehicleData(pathArray[i]))["s2m"], + } + } + dataPack["dpack"] = dataPackElement + } else { + dataPack["dpack"] = map[string]interface{}{ + "path": pathArray[0], + "dp": string2Map(getVehicleData(pathArray[0]))["s2m"], + } + } + return dataPack +} + func getVssPathList(host string, port int, path string) []byte { url := "http://" + host + ":" + strconv.Itoa(port) + path utils.Info.Printf("url = %s", url) @@ -1007,11 +1031,12 @@ func feederFrontend(toFeeder chan string, fromFeederRorC chan string, fromFeeder udsConn = utils.GetUdsConn("*", "serverFeeder") if udsConn == nil && attempts >= 10-1 { utils.Error.Printf("feederFrontend:Failed to UDS connect to feeder.") - return // ??? + return } attempts++ time.Sleep(3 * time.Second) } + feederConnected = true utils.Info.Printf("feederFrontend:Connected to feeder.") configureDefault(udsConn) fromFeeder := make(chan string) @@ -1224,6 +1249,7 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) { func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) { stateDbType = stateStorageType historySupport = histSupport + feederConnected = false utils.ReadUdsRegistrations("uds-registration.json") @@ -1401,14 +1427,14 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, state break } } - dataPack := getDataPack(pathArray, filterList) - if len(dataPack) == 0 { + dataPack := getDataPackMap(pathArray) +/* if len(dataPack) == 0 { utils.Info.Printf("No historic data available") utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data dataChan <- errorResponseMap break - } - responseMap["data"] = string2Map(dataPack)["s2m"] + }*/ + responseMap["data"] = dataPack["dpack"] dataChan <- responseMap case "subscribe": var subscriptionState SubscriptionState @@ -1488,7 +1514,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, state subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - subscriptionMap["data"] = string2Map(getDataPack(subscriptionState.Path, nil))["s2m"] + subscriptionMap["data"] = getDataPackMap(subscriptionState.Path)["dpack"] backendChan <- subscriptionMap case clPack := <-CLChannel: // curve logging notification index := getSubcriptionStateIndex(clPack.SubscriptionId, subscriptionList) @@ -1537,7 +1563,7 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs subscriptionMap["ts"] = utils.GetRfcTime() subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) subscriptionMap["RouterId"] = subscriptionState.RouterId - subscriptionMap["data"] = string2Map(getDataPack(subscriptionList[i].Path, nil))["s2m"] + subscriptionMap["data"] = getDataPackMap(subscriptionList[i].Path)["dpack"] backendChan <- subscriptionMap } } diff --git a/server/vissv2server/vissv2server.go b/server/vissv2server/vissv2server.go index 460d3ad6..393570f2 100644 --- a/server/vissv2server/vissv2server.go +++ b/server/vissv2server/vissv2server.go @@ -644,16 +644,16 @@ func issueServiceRequest(requestMap map[string]interface{}, tDChanIndex int, sDC pathLen := getPathLen(string(searchData[i].NodePath[:])) paths += "\"" + string(searchData[i].NodePath[:pathLen]) + "\", " } + if matches == 0 { + utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data + backendChan[tDChanIndex] <- errorResponseMap + return + } totalMatches += matches maxValidation = utils.GetMaxValidation(int(validation), maxValidation) } - if totalMatches == 0 { - utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data - backendChan[tDChanIndex] <- errorResponseMap - return - } nodeType := utils.VSSgetType(searchData[0].NodeHandle) - if strings.Contains (utils.VSSgetDatatype(searchData[0].NodeHandle), ".FileDescriptor") { + if strings.Contains (utils.VSSgetDatatype(searchData[0].NodeHandle), ".FileDescriptor") { // path to struct def response := initiateFileTransfer(requestMap, nodeType, searchPath[0]) backendChan[tDChanIndex] <- response return