Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread comm refactoring #66

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/client-1.0/requests.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","requestId":"232"},
{"request":[{"action":"get","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","requestId":"232"},
{"action":"get","path":"Incorrect/Path","requestId":"1957"},
{"action":"get","path":"Vehicle.Acceleration.Longitudinal","requestId":"233"},
{"action":"get","path":"Vehicle/ADAS","filter":{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},"requestId":"237"},
{"action":"set", "path":"Vehicle/Cabin/Door/Row1/Right/IsOpen", "value":"999", "requestId":"245"},
{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"timebased","parameter":{"period":"3"}},"requestId":"246"}]}
{"action":"get","path":"Vehicle/ADAS","filter":{"variant":"paths","parameter":["ABS/*","CruiseControl/IsError"]},"requestId":"237"},
{"action":"set", "path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen", "value":"true", "requestId":"245"},
{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/DriverSide/IsOpen","filter":{"variant":"timebased","parameter":{"period":"3"}},"requestId":"246"}]}
107 changes: 69 additions & 38 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type FeederPathElem struct {
Reference int
}
var feederPathList []FeederPathElem
var feederConnected bool

//var feederConn net.Conn
//var hostIp string
Expand Down Expand Up @@ -126,16 +127,16 @@ var IoTDBConfig = IoTDBConfiguration{

var dummyValue int // dummy value returned when DB configured to none. Counts from 0 to 999, wrap around, updated every 47 msec

func initDataServer(serviceMgrChan chan string, clientChannel chan string, backendChannel chan string) {
func initDataServer(serviceMgrChan chan map[string]interface{}, clientChannel chan map[string]interface{}, backendChannel chan map[string]interface{}) {
for {
select {
case request := <-serviceMgrChan:
utils.Info.Printf("Service mgr request: %s", request)
// utils.Info.Printf("Service mgr request: %s", request)

clientChannel <- request // forward to mgr hub,
if strings.Contains(request, "internal-killsubscriptions") == false { // no response on kill sub
if request["action"] != "internal-killsubscriptions" { // no response on kill sub
response := <-clientChannel // and wait for response
utils.Info.Printf("Service mgr response: %s", response)
// utils.Info.Printf("Service mgr response: %s", response)
serviceMgrChan <- response
}
case notification := <-backendChannel: // notification
Expand Down Expand Up @@ -395,10 +396,6 @@ func compareValues(logicOp string, latestValue string, currentValue string, diff
return false
}

func addPackage(incompleteMessage string, packName string, packValue string) string {
return incompleteMessage[:len(incompleteMessage)-1] + ", \"" + packName + "\":" + packValue + "}"
}

func deactivateSubscription(subscriptionList []SubscriptionState, subscriptionId string) (int, []SubscriptionState) {
id, _ := strconv.Atoi(subscriptionId)
index := getSubcriptionStateIndex(id, subscriptionList)
Expand Down Expand Up @@ -621,6 +618,9 @@ func setVehicleData(path string, value string) string {
utils.Error.Printf("setVehicleData:Write failed, err = %s", err)
return ""
}*/
if !feederConnected {
return ""
}
message := `{"action": "set", "data": {"path":"` + path + `", "dp":{"value":"` + value + `", "ts":"` + ts + `"}}}`
toFeeder <- message
return ts
Expand Down Expand Up @@ -921,6 +921,26 @@ func getDataPack(pathArray []string, filterList []utils.FilterObject) string {
return dataPack
}

func getDataPackMap(pathArray []string) map[string]interface{} {
dataPack := make(map[string]interface{}, 0)
if len(pathArray) > 1 {
dataPackElement := make([]interface{}, len(pathArray))
for i := 0; i < len(pathArray); i++ {
dataPackElement[i] = map[string]interface{}{
"path": pathArray[i],
"dp": string2Map(getVehicleData(pathArray[i]))["s2m"],
}
}
dataPack["dpack"] = dataPackElement
} else {
dataPack["dpack"] = map[string]interface{}{
"path": pathArray[0],
"dp": string2Map(getVehicleData(pathArray[0]))["s2m"],
}
}
return dataPack
}

func getVssPathList(host string, port int, path string) []byte {
url := "http://" + host + ":" + strconv.Itoa(port) + path
utils.Info.Printf("url = %s", url)
Expand Down Expand Up @@ -1011,11 +1031,12 @@ func feederFrontend(toFeeder chan string, fromFeederRorC chan string, fromFeeder
udsConn = utils.GetUdsConn("*", "serverFeeder")
if udsConn == nil && attempts >= 10-1 {
utils.Error.Printf("feederFrontend:Failed to UDS connect to feeder.")
return // ???
return
}
attempts++
time.Sleep(3 * time.Second)
}
feederConnected = true
utils.Info.Printf("feederFrontend:Connected to feeder.")
configureDefault(udsConn)
fromFeeder := make(chan string)
Expand Down Expand Up @@ -1224,9 +1245,11 @@ func feederReader(udsConn net.Conn, fromFeeder chan string) {
}
}

func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
//func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) {
func ServiceMgrInit(mgrId int, serviceMgrChan chan map[string]interface{}, stateStorageType string, histSupport bool, dbFile string) {
stateDbType = stateStorageType
historySupport = histSupport
feederConnected = false

utils.ReadUdsRegistrations("uds-registration.json")

Expand Down Expand Up @@ -1330,8 +1353,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
utils.Error.Printf("Unknown state storage type = %s", stateDbType)
}

dataChan := make(chan string)
backendChan := make(chan string)
dataChan := make(chan map[string]interface{})
backendChan := make(chan map[string]interface{})
subscriptionChan := make(chan int)
historyAccessChannel = make(chan string)
initClResources()
Expand Down Expand Up @@ -1360,12 +1383,10 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri

for {
select {
case request := <-dataChan: // request from server core
utils.Info.Printf("Service manager: Request from Server core:%s\n", request)
case requestMap := <-dataChan: // request from server core
// utils.Info.Printf("Service manager: Request from Server core:%s\n", request)
// TODO: interact with underlying subsystem to get the value
var requestMap = make(map[string]interface{})
var responseMap = make(map[string]interface{})
utils.MapRequest(request, &requestMap)
responseMap["RouterId"] = requestMap["RouterId"]
responseMap["action"] = requestMap["action"]
responseMap["requestId"] = requestMap["requestId"]
Expand All @@ -1377,23 +1398,23 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
case "set":
if strings.Contains(requestMap["path"].(string), "[") == true {
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
ts := setVehicleData(requestMap["path"].(string), requestMap["value"].(string))
if len(ts) == 0 {
utils.SetErrorResponse(requestMap, errorResponseMap, 7, "") //service_unavailable
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
responseMap["ts"] = ts
dataChan <- utils.FinalizeMessage(responseMap)
dataChan <- responseMap
case "get":
pathArray := unpackPaths(requestMap["path"].(string))
if pathArray == nil {
utils.Error.Printf("Unmarshal of path array failed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
var filterList []utils.FilterObject
Expand All @@ -1402,32 +1423,33 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
if len(filterList) == 0 {
utils.Error.Printf("Request filter malformed.")
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
}
dataPack := getDataPack(pathArray, filterList)
if len(dataPack) == 0 {
dataPack := getDataPackMap(pathArray)
/* if len(dataPack) == 0 {
utils.Info.Printf("No historic data available")
utils.SetErrorResponse(requestMap, errorResponseMap, 6, "") //unavailable_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
dataChan <- addPackage(utils.FinalizeMessage(responseMap), "data", dataPack)
}*/
responseMap["data"] = dataPack["dpack"]
dataChan <- responseMap
case "subscribe":
var subscriptionState SubscriptionState
subscriptionState.SubscriptionId = subscriptionId
subscriptionState.RouterId = requestMap["RouterId"].(string)
subscriptionState.Path = unpackPaths(requestMap["path"].(string))
if requestMap["filter"] == nil || requestMap["filter"] == "" {
utils.SetErrorResponse(requestMap, errorResponseMap, 0, "") //bad_request
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
break
}
utils.UnpackFilter(requestMap["filter"], &(subscriptionState.FilterList))
if len(subscriptionState.FilterList) == 0 {
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
}
if requestMap["gatingId"] != nil {
subscriptionState.GatingId = requestMap["gatingId"].(string)
Expand All @@ -1441,7 +1463,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
toFeeder <- createFeederNotifyMessage(variant, subscriptionState.Path, subscriptionId)
}
subscriptionId++ // not to be incremented elsewhere
dataChan <- utils.FinalizeMessage(responseMap)
dataChan <- responseMap
case "unsubscribe":
if requestMap["subscriptionId"] != nil {
status := -1
Expand All @@ -1450,15 +1472,15 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
status, subscriptionList = deactivateSubscription(subscriptionList, subscriptId)
if status != -1 {
responseMap["subscriptionId"] = subscriptId
dataChan <- utils.FinalizeMessage(responseMap)
toFeeder <- request
dataChan <- responseMap
toFeeder <- utils.FinalizeMessage(requestMap)
break
}
requestMap["subscriptionId"] = subscriptId
}
}
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
case "internal-killsubscriptions":
isRemoved := true
for isRemoved == true {
Expand All @@ -1473,12 +1495,12 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
requestMap["requestId"] = nil
requestMap["subscriptionId"] = subscriptionId
utils.SetErrorResponse(requestMap, errorResponseMap, 2, "Token expired or consent cancelled.")
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
_, subscriptionList = scanAndRemoveListItem(subscriptionList, routerId)
}
default:
utils.SetErrorResponse(requestMap, errorResponseMap, 1, "Unknown action") //invalid_data
dataChan <- utils.FinalizeMessage(errorResponseMap)
dataChan <- errorResponseMap
} // switch
case <-dummyTicker.C:
dummyValue++
Expand All @@ -1492,7 +1514,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId)
subscriptionMap["RouterId"] = subscriptionState.RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionState.Path, nil))
subscriptionMap["data"] = getDataPackMap(subscriptionState.Path)["dpack"]
backendChan <- subscriptionMap
case clPack := <-CLChannel: // curve logging notification
index := getSubcriptionStateIndex(clPack.SubscriptionId, subscriptionList)
if index == -1 {
Expand All @@ -1510,7 +1533,8 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionList[index].SubscriptionId)
subscriptionMap["RouterId"] = subscriptionList[index].RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", clPack.DataPack)
subscriptionMap["data"] = string2Map(clPack.DataPack)["s2m"]
backendChan <- subscriptionMap
case <-subscriptTicker.C:
if feederNotification == false { // feeder does not issue notifications
subscriptionList = checkRCFilterAndIssueMessages("", subscriptionList, backendChan)
Expand All @@ -1526,7 +1550,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
utils.Info.Printf("Service manager exit")
}

func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan string) []SubscriptionState {
func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan map[string]interface{}) []SubscriptionState {
// check if range or change notification triggered
for i := range subscriptionList {
if len(triggeredPath) == 0 || triggeredPath == subscriptionList[i].Path[0] {
Expand All @@ -1539,13 +1563,20 @@ func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []Subs
subscriptionMap["ts"] = utils.GetRfcTime()
subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId)
subscriptionMap["RouterId"] = subscriptionState.RouterId
backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionList[i].Path, nil))
subscriptionMap["data"] = getDataPackMap(subscriptionList[i].Path)["dpack"]
backendChan <- subscriptionMap
}
}
}
return subscriptionList
}

func string2Map(msg string) map[string]interface{} {
var msgMap map[string]interface{}
utils.MapRequest(`{"s2m":`+msg+"}", &msgMap)
return msgMap
}

func decodeFeederMessage(feederMessage string, feederNotification bool) (string, bool) {
if len(feederMessage) == 0 {
return "", feederNotification
Expand Down
Loading
Loading