diff --git a/dynamic-mapping-ui/src/shared/connector-configuration.service.ts b/dynamic-mapping-ui/src/shared/connector-configuration.service.ts index e030afa9..e13e0baa 100644 --- a/dynamic-mapping-ui/src/shared/connector-configuration.service.ts +++ b/dynamic-mapping-ui/src/shared/connector-configuration.service.ts @@ -60,15 +60,18 @@ export class ConnectorConfigurationService { private triggerConfigurations$: Subject = new Subject(); private realtimeConnectorStatus$: Subject = new Subject(); - private connectorConfigurations$: Observable; + private realtimeConnectorConfigurations$: Observable< + ConnectorConfiguration[] + >; + private enrichedConnectorConfiguration$: Observable; private _agentId: string; private initialized: boolean = false; private realtime: Realtime; private subscriptionEvents: any; - getConnectorConfigurationsLive(): Observable { - return this.connectorConfigurations$; + getRealtimeConnectorConfigurations(): Observable { + return this.realtimeConnectorConfigurations$; } resetCache() { @@ -84,8 +87,8 @@ export class ConnectorConfigurationService { this.startConnectorStatusSubscriptions(); this.initialized = true; } - this.realtimeConnectorStatus$.next({} as any); - this.triggerConfigurations$.next('start' + '/' + n); + this.realtimeConnectorStatus$.next({} as any); + this.triggerConfigurations$.next('start' + '/' + n); } updateConnectorConfigurations() { @@ -98,7 +101,7 @@ export class ConnectorConfigurationService { } initConnectorConfigurations() { - const connectorConfig$ = this.triggerConfigurations$.pipe( + this.enrichedConnectorConfiguration$ = this.triggerConfigurations$.pipe( // tap((state) => // console.log('New triggerConfigurations:', state + '/' + Date.now()) // ), @@ -125,8 +128,8 @@ export class ConnectorConfigurationService { }), shareReplay(1) ); - this.connectorConfigurations$ = combineLatest([ - connectorConfig$, + this.realtimeConnectorConfigurations$ = combineLatest([ + this.enrichedConnectorConfiguration$, this.realtimeConnectorStatus$ ]).pipe( map((vars) => { @@ -242,11 +245,11 @@ export class ConnectorConfigurationService { // subscribe to event stream this.subscriptionEvents = this.realtime.subscribe( `/events/${this._agentId}`, - this.updateConnectorStatus + this.updateRealtimeConnectorStatus ); } - private updateConnectorStatus = async (p: object) => { + private updateRealtimeConnectorStatus = async (p: object) => { const payload = p['data']['data']; this.realtimeConnectorStatus$.next(payload); }; diff --git a/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts b/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts index 3c542d0a..0d8cd662 100644 --- a/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts +++ b/dynamic-mapping-ui/src/shared/connector-configuration/connector-grid.component.ts @@ -217,7 +217,7 @@ export class ConnectorConfigurationComponent }); this.connectorConfigurationService - .getConnectorConfigurationsLive() + .getRealtimeConnectorConfigurations() .subscribe((confs) => this.configurations$.next(confs)); this.configurations$.subscribe((confs) => { @@ -227,7 +227,7 @@ export class ConnectorConfigurationComponent (conf) => (conf['checked'] = this.selected.includes(conf.ident)) ); }); - this.connectorConfigurationService.startConnectorConfigurations(); + this.connectorConfigurationService.startConnectorConfigurations(); } public onSelectToggle(id: string) { @@ -455,7 +455,6 @@ export class ConnectorConfigurationComponent }); } - findNameByIdent(ident: string): string { return this.configurations?.find((conf) => conf.ident == ident)?.name; } diff --git a/dynamic-mapping-ui/src/shared/connector-log/connector-status.component.ts b/dynamic-mapping-ui/src/shared/connector-log/connector-status.component.ts index 7b4e845c..15a79ddc 100644 --- a/dynamic-mapping-ui/src/shared/connector-log/connector-status.component.ts +++ b/dynamic-mapping-ui/src/shared/connector-log/connector-status.component.ts @@ -69,7 +69,7 @@ export class ConnectorStatusComponent implements OnInit, OnDestroy { await this.connectorStatusService.startConnectorStatusLogs(); this.connectorConfigurationService - .getConnectorConfigurationsLive() + .getRealtimeConnectorConfigurations() .subscribe((confs) => { this.configurations = confs; }); diff --git a/dynamic-mapping-ui/src/shared/connector-status.service.ts b/dynamic-mapping-ui/src/shared/connector-status.service.ts index a324a0a2..608b4462 100644 --- a/dynamic-mapping-ui/src/shared/connector-status.service.ts +++ b/dynamic-mapping-ui/src/shared/connector-status.service.ts @@ -60,12 +60,12 @@ export class ConnectorStatusService { async startConnectorStatusLogs() { // console.log('Calling: startConnectorStatusLogs'); - if (!this.initialized) { - this.startConnectorStatusSubscriptions(); - await this.initConnectorLogsRealtime(); - this.initialized = true; - } - this.triggerLogs$.next([{ type: 'reset' }]); + if (!this.initialized) { + this.startConnectorStatusSubscriptions(); + await this.initConnectorLogsRealtime(); + this.initialized = true; + } + this.triggerLogs$.next([{ type: 'reset' }]); } updateStatusLogs(filter: any) { @@ -81,11 +81,11 @@ export class ConnectorStatusService { if (!this._agentId) { this._agentId = await this.sharedService.getDynamicMappingServiceAgent(); } - console.log( - 'Calling: BrokerConfigurationService.initConnectorLogsRealtime()', - this._agentId - ); - const sourceList$ = this.triggerLogs$.pipe( + // console.log( + // 'Calling: BrokerConfigurationService.initConnectorLogsRealtime()', + // this._agentId + // ); + const filteredConnectorStatus$ = this.triggerLogs$.pipe( // tap((x) => console.log('TriggerLogs In', x)), switchMap(() => { const filter = { @@ -114,11 +114,11 @@ export class ConnectorStatusService { : event.connectorIdent == this.filterStatusLog.connectorIdent; }) ), - share() - // tap((x) => console.log('TriggerLogs Out', x)) + share(), + tap((x) => console.log('TriggerLogs Out', x)) ); - const sourceRealtime$ = this.realtimeConnectorStatus$.pipe( + const realtimeConnectorStatusRealtime$ = this.realtimeConnectorStatus$.pipe( // tap((x) => console.log('IncomingRealtime In', x)), filter((event) => { return ( @@ -138,25 +138,29 @@ export class ConnectorStatusService { }) ); - const o: Observable = merge( - sourceList$, - sourceRealtime$, + // const refreshedConnectorStatus$: Observable = + merge( + filteredConnectorStatus$, + realtimeConnectorStatusRealtime$, this.triggerLogs$ - ).pipe( - // tap((i) => console.log('Items', i)), - scan((acc, val) => { - let sortedAcc; - if (val[0]?.type == 'reset') { - // console.log('Reset loaded logs!'); - sortedAcc = []; - } else { - sortedAcc = val.concat(acc); - } - sortedAcc = sortedAcc.slice(0, 9); - return sortedAcc; - }, []) - ); - o.subscribe((logs) => this.statusLogs$.next(logs)); + ) + .pipe( + // tap((i) => console.log('Items', i)), + scan((acc, val) => { + let sortedAcc; + if (val[0]?.type == 'reset') { + // console.log('Reset loaded logs!'); + sortedAcc = []; + } else { + sortedAcc = val.concat(acc); + } + sortedAcc = sortedAcc.slice(0, 9); + return sortedAcc; + }, []), + tap((logs) => this.statusLogs$.next(logs)) + ) + .subscribe(); + // refreshedConnectorStatus$.subscribe((logs) => this.statusLogs$.next(logs)); } async startConnectorStatusSubscriptions(): Promise { @@ -168,11 +172,11 @@ export class ConnectorStatusService { // subscribe to event stream this.subscriptionEvents = this.realtime.subscribe( `/events/${this._agentId}`, - this.updateConnectorStatus + this.updateRealtimeConnectorStatus ); } - private updateConnectorStatus = async (p: object) => { + private updateRealtimeConnectorStatus = async (p: object) => { const payload = p['data']['data']; this.realtimeConnectorStatus$.next(payload); };