From 671b715f1ba7aee936c179e74f14c80cafb2bac5 Mon Sep 17 00:00:00 2001 From: Flavio Cirillo Date: Thu, 26 Sep 2024 16:20:55 +0200 Subject: [PATCH] Manage DEBUG logs more intelligently. Possibility to discart to improve performance --- broker/ngsiv1.go | 8 ++- broker/thinBroker.go | 18 ++--- common/datamodel/datamodel.go | 4 +- common/ngsi/ngsi.go | 8 ++- discovery/fastDiscovery.go | 16 +++-- master/master.go | 6 +- master/taskMgr.go | 83 +++++++++++++++++------ release/latest/cloud/config-template.json | 2 +- worker/dockerengine.go | 12 +++- worker/executor.go | 24 ++++--- 10 files changed, 118 insertions(+), 63 deletions(-) diff --git a/broker/ngsiv1.go b/broker/ngsiv1.go index a817ff30..64713d41 100644 --- a/broker/ngsiv1.go +++ b/broker/ngsiv1.go @@ -120,7 +120,9 @@ func (tb *ThinBroker) NGSIV1_SubscribeContext(w rest.ResponseWriter, r *rest.Req subReq := SubscribeContextRequest{} subReq.Attributes = make([]string, 0) - DEBUG.Println("Subscription request from: ", r.RemoteAddr) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("Subscription request from: ", r.RemoteAddr) + } err := r.DecodeJsonPayload(&subReq) if err != nil { @@ -255,7 +257,9 @@ func (tb *ThinBroker) NGSIV1_NotifyContextAvailability(w rest.ResponseWriter, r tb.subLinks_lock.Lock() mainSubID, exist := tb.availabilitySub2MainSub[subID] if !exist { - DEBUG.Println("put it into the tempCache and handle it later") + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("put it into the tempCache and handle it later") + } tb.tmpNGSI9NotifyCache[subID] = ¬ifyContextAvailabilityReq } tb.subLinks_lock.Unlock() diff --git a/broker/thinBroker.go b/broker/thinBroker.go index 698618fb..9e0ed3b6 100644 --- a/broker/thinBroker.go +++ b/broker/thinBroker.go @@ -41,8 +41,6 @@ type ThinBroker struct { entityId2Subcriptions map[string][]string e2sub_lock sync.RWMutex - isDebugEnabled bool - //counter of heartbeat counter int64 } @@ -70,8 +68,6 @@ func (tb *ThinBroker) Start(cfg *Config) { tb.myProfile.BID = tb.myEntityId tb.myProfile.MyURL = cfg.GetExternalBrokerURL() - tb.isDebugEnabled = cfg.Logging.DebugEnabled - // register itself to the IoT discovery tb.registerMyself() } @@ -207,7 +203,7 @@ func (tb *ThinBroker) getEntity(eid string) *ContextElement { } func (tb *ThinBroker) deleteEntity(eid string) error { - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println(" TO REMOVE ENTITY ", eid) } @@ -450,7 +446,7 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri originator := subscription.Subscriber.Correlator if correlator != "" && originator != "" && correlator == originator { beTheSame = true - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("session ID from producer ", correlator, ", subscriber ", originator) } } @@ -458,7 +454,7 @@ func (tb *ThinBroker) notifySubscribers(ctxElem *ContextElement, correlator stri tb.subscriptions_lock.RUnlock() if beTheSame { - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println(" ======= producer and subscriber are the same ===========") } continue @@ -544,7 +540,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement, Tenant := subscription.Subscriber.Tenant if subscription.Subscriber.RequireReliability && len(subscription.Subscriber.NotifyCache) > 0 { - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("resend notify: ", len(subscription.Subscriber.NotifyCache)) } for _, pCtxElem := range subscription.Subscriber.NotifyCache { @@ -560,7 +556,7 @@ func (tb *ThinBroker) sendReliableNotifyToSubscriber(elements []ContextElement, err := postNotifyContext(elements, sid, subscriberURL, DestinationBroker, Tenant, tb.SecurityCfg) if err != nil { - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("NOTIFY is not received by the subscriber, ", subscriberURL) } @@ -685,7 +681,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil action = "DELETE" } - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println(action, " subID ", mainSubID, " subscription isSimpleByType ", tb.subscriptions[mainSubID].IsSimpleByType()) DEBUG.Println(tb.entityId2Subcriptions) } @@ -694,7 +690,7 @@ func (tb *ThinBroker) handleNGSI9Notify(mainSubID string, notifyContextAvailabil registration := registrationResp.ContextRegistration for _, eid := range registration.EntityIdList { - if tb.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("===> ", eid, " , ", mainSubID) } diff --git a/common/datamodel/datamodel.go b/common/datamodel/datamodel.go index 0998cef9..26afc707 100644 --- a/common/datamodel/datamodel.go +++ b/common/datamodel/datamodel.go @@ -305,7 +305,9 @@ func (worker *WorkerProfile) IsLive(duration int) bool { delta := time.Since(worker.Last_Heartbeat_Update) if int(delta.Seconds()) >= duration { - DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("Worker delta failed ", int(delta.Seconds()), " while duration is ", duration) + } return false } else { return true diff --git a/common/ngsi/ngsi.go b/common/ngsi/ngsi.go index f5548b5f..0f984a41 100644 --- a/common/ngsi/ngsi.go +++ b/common/ngsi/ngsi.go @@ -24,6 +24,10 @@ var ( DEBUG *log.Logger ) +func LoggerIsEnabled(l *log.Logger) bool { + return (fmt.Sprintf("%T", l.Writer()) != "io.discard") +} + type SiteInfo struct { ExternalAddress string `json:"externalAddress"` GeohashID string `json:"geohashID"` @@ -786,7 +790,9 @@ func (restriction *Restriction) GetScope() OperationScope { func (restriction *Restriction) GetNearbyFilter() *NearBy { for _, scope := range restriction.Scopes { - DEBUG.Println(" SCOPE: ", scope) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println(" SCOPE: ", scope) + } if scope.Type == "nearby" { nearby := scope.Value.(NearBy) diff --git a/discovery/fastDiscovery.go b/discovery/fastDiscovery.go index 56d29e43..dbd3ba5e 100644 --- a/discovery/fastDiscovery.go +++ b/discovery/fastDiscovery.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io/ioutil" "net/http" + "os" "strings" "sync" "time" @@ -50,8 +51,6 @@ type FastDiscovery struct { delayStoreOnFile int storeOnDisk bool - isDebugEnabled bool - // lock to control the update subscriptions in database subscriptionsDbLock sync.RWMutex storeSubscriptionsOnFileScheduled bool @@ -74,7 +73,6 @@ func (fd *FastDiscovery) Init(config *Config) { fd.storeSubscriptionsOnFileScheduled = false fd.storeBrokersOnFileScheduled = false fd.storeOnDisk = config.Discovery.StoreOnDisk - fd.isDebugEnabled = config.Logging.DebugEnabled //INFO.Println("config.Discovery.DelayStoreRegistrationsOnFile ", config.Discovery.DelayStoreStoreOnFile) fd.delayStoreOnFile = config.Discovery.DelayStoreOnFile @@ -600,7 +598,9 @@ func (fd *FastDiscovery) updateSubscriptionsOnDisk() { // somebody has the lock if fd.storeSubscriptionsOnFileScheduled { - INFO.Println("A store on file for registrations is already scheduled") + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("A store on file for registrations is already scheduled") + } return } @@ -665,7 +665,9 @@ func (fd *FastDiscovery) updateBrokersOnDisk() { // somebody has the lock if fd.storeBrokersOnFileScheduled { - INFO.Println("A store on file for registrations is already scheduled") + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("A store on file for registrations is already scheduled") + } return } @@ -690,7 +692,7 @@ func (fd *FastDiscovery) updateBrokersOnDisk() { ERROR.Println(err) } // err = ioutil.WriteFile("brokers.json", content, 0644) - err = ioutil.WriteFile(fd.dbFiles["brokers"], content, 0644) + err = os.WriteFile(fd.dbFiles["brokers"], content, 0644) if err != nil { ERROR.Println(err) } @@ -707,7 +709,7 @@ func (fd *FastDiscovery) readBrokersFromDisk() { defer fd.brokersDbLock.Unlock() // content, err := ioutil.ReadFile("brokers.json") - content, err := ioutil.ReadFile(fd.dbFiles["brokers"]) + content, err := os.ReadFile(fd.dbFiles["brokers"]) if err != nil { ERROR.Println(err) } diff --git a/master/master.go b/master/master.go index 10218600..74cf885a 100644 --- a/master/master.go +++ b/master/master.go @@ -58,8 +58,6 @@ type Master struct { prevNumOfTask int counter_lock sync.RWMutex - isDebugEnabled bool - //type of subscribed entities subID2Type map[string]string } @@ -72,8 +70,6 @@ func (master *Master) Start(configuration *Config) { master.discoveryURL = configuration.GetDiscoveryURL() master.designerURL = configuration.GetDesignerURL() - master.isDebugEnabled = configuration.Logging.DebugEnabled - master.workers = make(map[string]*WorkerProfile) master.operatorList = make(map[string]Operator) @@ -526,7 +522,7 @@ func (master *Master) SelectWorker(locations []Point) string { for _, worker := range master.workers { // if this worker is already overloaded, check the next one if worker.IsOverloaded() { - if master.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("Worker", worker.WID, " has reached its capacity of ", worker.Capacity, " with ", worker.Workload, " tasks running") } continue diff --git a/master/taskMgr.go b/master/taskMgr.go index e13672cc..74f65e29 100644 --- a/master/taskMgr.go +++ b/master/taskMgr.go @@ -99,7 +99,8 @@ func (gf *GroupInfo) GetHash() string { sortedpairs := make([]*KVPair, 0) for k, v := range *gf { - if fmt.Sprintf("%T", DEBUG.Writer()) != "io.discard" { + + if LoggerIsEnabled(DEBUG) { DEBUG.Printf("group k: %s, v: %+v\r\n", k, v) } @@ -154,7 +155,9 @@ func (flow *FogFlow) Init() { // to update the execution plan based on the changes of registered context availability func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction string, registeredEntity *EntityRegistration, workerSelection ProximityWorkerSelectionFn) []*DeploymentAction { if _, exist := flow.Subscriptions[subID]; !exist { - DEBUG.Println(subID, "subscription does not exist any more") + if LoggerIsEnabled(DEBUG) { + DEBUG.Println(subID, "subscription does not exist any more") + } return nil } @@ -164,7 +167,9 @@ func (flow *FogFlow) MetadataDrivenTaskOrchestration(subID string, entityAction // This check is to see if the selection is only type (no entityId, no scope) entityID = "*" } - DEBUG.Println(entityAction, " entity ", entityID, "from Subscription ", subID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println(entityAction, " entity ", entityID, "from Subscription ", subID) + } switch entityAction { case "CREATE", "UPDATE": //update context availability @@ -244,15 +249,21 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp groups := flow.getRelevantGroups(inputSubscription, entityID) deploymentActions := make([]*DeploymentAction, 0) - DEBUG.Println("[related groups]: ", groups) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[related groups]: ", groups) + } for _, group := range groups { - INFO.Println("# hash =", group.GetHash()) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("# hash =", group.GetHash()) + } hashID := group.GetHash() // check if the associated task instance is already created if task, exist := flow.ExecutionPlan[hashID]; exist { entitiesList := flow.searchRelevantEntities(&group, entityID) - DEBUG.Println("[entitiesList]: ", entitiesList) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[entitiesList]: ", entitiesList) + } for _, entity := range entitiesList { newInput := true for _, input := range task.Inputs { @@ -272,7 +283,9 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp } if newInput { - DEBUG.Printf("new input %+v to task %+v\r\n", entity, task) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("new input %+v to task %+v\r\n", entity, task) + } inputEntity := InputEntity{} inputEntity.ID = entity.ID @@ -300,11 +313,15 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp } else { // check if the location in this input entity is changed locationChanged := false - DEBUG.Printf("Length of Task Input %d", len(task.Inputs)) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("Length of Task Input %d", len(task.Inputs)) + } for i := 0; i < len(task.Inputs); i++ { if task.Inputs[i].ID == entity.ID && !(task.Inputs[i].Location.IsEqual(&entity.Location)) { locationChanged = true - DEBUG.Println("[location changed] entity: ", entity.ID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[location changed] entity: ", entity.ID) + } // update the input entities with the new location task.Inputs[i].Location = entity.Location break @@ -353,7 +370,9 @@ func (flow *FogFlow) expandExecutionPlan(entityID string, inputSubscription *Inp flow.ExecutionPlan[hashID] = &task - DEBUG.Printf("new task %+v, hashID %s, taskID %s\r\n", task, hashID, task.TaskID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("new task %+v, hashID %s, taskID %s\r\n", task, hashID, task.TaskID) + } //generate a deployment action deploymentAction := flow.addNewTask(&task) @@ -429,7 +448,9 @@ func (flow *FogFlow) removeExistingTask(task *TaskConfig) *DeploymentAction { func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *InputSubscription) []*DeploymentAction { groups := flow.getRelevantGroups(inputSubscription, entityID) - DEBUG.Printf("removing groups = %+v\r\n", groups) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("removing groups = %+v\r\n", groups) + } deploymentActions := make([]*DeploymentAction, 0) @@ -446,7 +467,9 @@ func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *Inp //if any of the input streams is delete, the task will be terminated if !flow.checkInputsOfTaskInstance(task) { // remove this task - DEBUG.Printf("removing an existing task %+v\r\n", task) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("removing an existing task %+v\r\n", task) + } // add it into the deployment action list deploymentAction := flow.removeExistingTask(task) @@ -455,14 +478,18 @@ func (flow *FogFlow) removeExecutionPlan(entityID string, inputSubscription *Inp } // remove the group key from the table - DEBUG.Printf(" GROUP KEY %+v\r\n", group) - DEBUG.Printf(" table %+v\r\n", flow.UniqueKeys) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf(" GROUP KEY %+v\r\n", group) + DEBUG.Printf(" table %+v\r\n", flow.UniqueKeys) + } // remove this task from the execution plan delete(flow.ExecutionPlan, hashID) } else { // remove only the specific input - DEBUG.Printf("remove an existing input %+v to task %+v\r\n", entityID, task) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("remove an existing input %+v to task %+v\r\n", entityID, task) + } //generate a deployment action flowInfo := FlowInfo{} @@ -558,7 +585,9 @@ func (flow *FogFlow) updateGroupedKeyValueTable(sub *InputSubscription, entityID } } - DEBUG.Println("[Unique Key Table]:", flow.UniqueKeys) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[Unique Key Table]:", flow.UniqueKeys) + } } func (flow *FogFlow) getRelevantGroups(sub *InputSubscription, entityID string) []GroupInfo { @@ -649,7 +678,9 @@ func (flow *FogFlow) searchRelevantEntities(group *GroupInfo, updatedEntityID st entityloop: for _, entityRegistration := range inputSub.ReceivedEntityRegistrations { - DEBUG.Println("entityRegistration", entityRegistration) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("entityRegistration", entityRegistration) + } if entityRegistration.IsMatched(restrictions) { inputEntity := InputEntity{} @@ -764,7 +795,9 @@ func (tMgr *TaskMgr) handleSynchronousTaskIntent(taskIntent *TaskIntent) { task := taskIntent.TaskObject for _, inputStreamConfig := range task.InputStreams { - INFO.Println(inputStreamConfig) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println(inputStreamConfig) + } subID := tMgr.selector2Subscription(&inputStreamConfig, taskIntent.GeoScope) if subID == "" { @@ -803,7 +836,9 @@ func (tMgr *TaskMgr) handleASynchronousTaskIntent(taskIntent *TaskIntent) { task := taskIntent.TaskObject for _, inputStreamConfig := range task.InputStreams { - INFO.Println(inputStreamConfig) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println(inputStreamConfig) + } subID := tMgr.selector2Subscription(&inputStreamConfig, taskIntent.GeoScope) if subID == "" { ERROR.Printf("failed to issue a subscription for this type of input, %+v\r\n", inputStreamConfig) @@ -892,7 +927,9 @@ func (tMgr *TaskMgr) selector2Subscription(inputSelector *InputStreamConfig, geo // the main function to deal with data-driven and context aware task orchestration func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction string, entityRegistration *EntityRegistration) { - INFO.Println("[Registration update]: ", subID, entityAction, entityRegistration.ID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[Registration update]: ", subID, entityAction, entityRegistration.ID) + } tMgr.subID2FogFunc_lock.RLock() funcName, fogFunctionExist := tMgr.subID2FogFunc[subID] @@ -916,7 +953,7 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction // derive the deployment actions according to the received registration deploymentActions := fogflow.MetadataDrivenTaskOrchestration(subID, entityAction, entityRegistration, tMgr.master.SelectWorker) if deploymentActions == nil || len(deploymentActions) == 0 { - if tMgr.master.isDebugEnabled { + if LoggerIsEnabled(DEBUG) { DEBUG.Println("nothing is triggered!!!") } return @@ -924,7 +961,9 @@ func (tMgr *TaskMgr) HandleContextAvailabilityUpdate(subID string, entityAction // schedule and send out the deployment actions for _, deploymentAction := range deploymentActions { - DEBUG.Println("[Orchestration Action]: ", deploymentAction.ActionType, deploymentAction.ActionInfo) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("[Orchestration Action]: ", deploymentAction.ActionType, deploymentAction.ActionInfo) + } switch deploymentAction.ActionType { case "ADD_TASK": diff --git a/release/latest/cloud/config-template.json b/release/latest/cloud/config-template.json index 00ab6b6b..00e57651 100644 --- a/release/latest/cloud/config-template.json +++ b/release/latest/cloud/config-template.json @@ -9,7 +9,7 @@ "info":"stdout", "error":"stdout", "protocol": "stdout", - "debug": "stdout" + "debug": "discard" }, "discovery": { "http_port": 8090, diff --git a/worker/dockerengine.go b/worker/dockerengine.go index c3017b8b..7594044d 100644 --- a/worker/dockerengine.go +++ b/worker/dockerengine.go @@ -66,13 +66,17 @@ func (dockerengine *DockerEngine) PullImage(dockerImage string) (string, error) auth.ServerAddress = dockerengine.workerCfg.Worker.Registry.ServerAddress } - DEBUG.Printf("options : %+v\r\n", auth) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("options : %+v\r\n", auth) + } opts := docker.PullImageOptions{ Repository: dockerImage, } - DEBUG.Printf("options : %+v\r\n", opts) + if LoggerIsEnabled(DEBUG) { + DEBUG.Printf("options : %+v\r\n", opts) + } err := dockerengine.client.PullImage(opts, auth) if err != nil { @@ -228,7 +232,9 @@ func (dockerengine *DockerEngine) StartTask(task *ScheduledTaskInstance, brokerU mount.ReadOnly = true mount.Type = "bind" - DEBUG.Println("mounting configuration ", mount) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("mounting configuration ", mount) + } hostConfig.Mounts = make([]docker.HostMount, 0) hostConfig.Mounts = append(hostConfig.Mounts, mount) diff --git a/worker/executor.go b/worker/executor.go index b5764cd5..39975204 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strings" "sync" @@ -131,7 +131,9 @@ func (e *Executor) LaunchTask(task *ScheduledTaskInstance) bool { for _, inputStream := range task.Inputs { subID, err := e.subscribeInputStream(refURL, task.ID, &inputStream) if err == nil { - DEBUG.Println("===========subID = ", subID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("===========subID = ", subID) + } taskCtx.Subscriptions = append(taskCtx.Subscriptions, subID) taskCtx.EntityID2SubID[inputStream.ID] = subID } else { @@ -210,14 +212,14 @@ func (e *Executor) configurateTask(port string, commands []interface{}) bool { } defer resp.Body.Close() - body, _ := ioutil.ReadAll(resp.Body) + body, _ := io.ReadAll(resp.Body) INFO.Println("task on port ", port, " has been configured with parameters ", jsonText) INFO.Println("response Body:", string(body)) return true } -func (e *Executor) registerTask(task *ScheduledTaskInstance, portNum string, containerID string) { +func (e *Executor) registerTask(task *ScheduledTaskInstance, portNum string) { ctxObj := ContextObject{} ctxObj.Entity.ID = task.ID @@ -245,7 +247,7 @@ func (e *Executor) registerTask(task *ScheduledTaskInstance, portNum string, con } } -//Subscribe for the input stream +// Subscribe for the input stream func (e *Executor) subscribeInputStream(refURL string, correlatorID string, inputStream *InputStream) (string, error) { subscription := SubscribeContextRequest{} @@ -321,7 +323,7 @@ func (e *Executor) TerminateTask(taskID string, paused bool) { INFO.Println("================== terminate task ID ============ ", taskID) e.taskMap_lock.Lock() - if _, ok := e.taskInstances[taskID]; ok == false { + if _, ok := e.taskInstances[taskID]; !ok { e.taskMap_lock.Unlock() return } @@ -362,7 +364,7 @@ func (e *Executor) TerminateTask(taskID string, paused bool) { e.taskMap_lock.Unlock() - if paused == true { + if paused { // only update its status e.worker.TaskUpdate(topologyName, taskName, taskID, serviceIntentID, "paused") } else { @@ -395,7 +397,7 @@ func (e *Executor) terminateAllTasks() { // add the specified input for an existing task func (e *Executor) onAddInput(flow *FlowInfo) { - if e.workerCfg.Worker.StartActualTask == false { + if !e.workerCfg.Worker.StartActualTask { return } @@ -422,7 +424,9 @@ func (e *Executor) handleAddInput(flow *FlowInfo) { taskCtx := e.taskInstances[flow.TaskInstanceID] subID, err := e.subscribeInputStream(taskCtx.refURL, taskCtx.TaskID, &flow.InputStream) if err == nil { - DEBUG.Println("===========subscribe new input = ", flow, " , subID = ", subID) + if LoggerIsEnabled(DEBUG) { + DEBUG.Println("===========subscribe new input = ", flow, " , subID = ", subID) + } taskCtx.Subscriptions = append(taskCtx.Subscriptions, subID) taskCtx.EntityID2SubID[flow.InputStream.ID] = subID } else { @@ -447,7 +451,7 @@ func (e *Executor) handleDelayedAddInputCommands(taskInstanceID string) { // remove the specified input for an existing task func (e *Executor) onRemoveInput(flow *FlowInfo) { - if e.workerCfg.Worker.StartActualTask == false { + if !e.workerCfg.Worker.StartActualTask { return }