Skip to content

Commit

Permalink
harmonized names of observable in ConnectorConfigurationService
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Sep 8, 2024
1 parent c1826ae commit eb117e3
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 48 deletions.
23 changes: 13 additions & 10 deletions dynamic-mapping-ui/src/shared/connector-configuration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ export class ConnectorConfigurationService {

private triggerConfigurations$: Subject<string> = new Subject();
private realtimeConnectorStatus$: Subject<IEvent> = new Subject();
private connectorConfigurations$: Observable<ConnectorConfiguration[]>;
private realtimeConnectorConfigurations$: Observable<
ConnectorConfiguration[]
>;
private enrichedConnectorConfiguration$: Observable<ConnectorConfiguration[]>;

private _agentId: string;
private initialized: boolean = false;
private realtime: Realtime;
private subscriptionEvents: any;

getConnectorConfigurationsLive(): Observable<ConnectorConfiguration[]> {
return this.connectorConfigurations$;
getRealtimeConnectorConfigurations(): Observable<ConnectorConfiguration[]> {
return this.realtimeConnectorConfigurations$;
}

resetCache() {
Expand All @@ -84,8 +87,8 @@ export class ConnectorConfigurationService {
this.startConnectorStatusSubscriptions();
this.initialized = true;
}
this.realtimeConnectorStatus$.next({} as any);
this.triggerConfigurations$.next('start' + '/' + n);
this.realtimeConnectorStatus$.next({} as any);
this.triggerConfigurations$.next('start' + '/' + n);
}

updateConnectorConfigurations() {
Expand All @@ -98,7 +101,7 @@ export class ConnectorConfigurationService {
}

initConnectorConfigurations() {
const connectorConfig$ = this.triggerConfigurations$.pipe(
this.enrichedConnectorConfiguration$ = this.triggerConfigurations$.pipe(
// tap((state) =>
// console.log('New triggerConfigurations:', state + '/' + Date.now())
// ),
Expand All @@ -125,8 +128,8 @@ export class ConnectorConfigurationService {
}),
shareReplay(1)
);
this.connectorConfigurations$ = combineLatest([
connectorConfig$,
this.realtimeConnectorConfigurations$ = combineLatest([
this.enrichedConnectorConfiguration$,
this.realtimeConnectorStatus$
]).pipe(
map((vars) => {
Expand Down Expand Up @@ -242,11 +245,11 @@ export class ConnectorConfigurationService {
// subscribe to event stream
this.subscriptionEvents = this.realtime.subscribe(
`/events/${this._agentId}`,
this.updateConnectorStatus
this.updateRealtimeConnectorStatus
);
}

private updateConnectorStatus = async (p: object) => {
private updateRealtimeConnectorStatus = async (p: object) => {
const payload = p['data']['data'];
this.realtimeConnectorStatus$.next(payload);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export class ConnectorConfigurationComponent
});

this.connectorConfigurationService
.getConnectorConfigurationsLive()
.getRealtimeConnectorConfigurations()
.subscribe((confs) => this.configurations$.next(confs));

this.configurations$.subscribe((confs) => {
Expand All @@ -227,7 +227,7 @@ export class ConnectorConfigurationComponent
(conf) => (conf['checked'] = this.selected.includes(conf.ident))
);
});
this.connectorConfigurationService.startConnectorConfigurations();
this.connectorConfigurationService.startConnectorConfigurations();
}

public onSelectToggle(id: string) {
Expand Down Expand Up @@ -455,7 +455,6 @@ export class ConnectorConfigurationComponent
});
}


findNameByIdent(ident: string): string {
return this.configurations?.find((conf) => conf.ident == ident)?.name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class ConnectorStatusComponent implements OnInit, OnDestroy {
await this.connectorStatusService.startConnectorStatusLogs();

this.connectorConfigurationService
.getConnectorConfigurationsLive()
.getRealtimeConnectorConfigurations()
.subscribe((confs) => {
this.configurations = confs;
});
Expand Down
72 changes: 38 additions & 34 deletions dynamic-mapping-ui/src/shared/connector-status.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ export class ConnectorStatusService {

async startConnectorStatusLogs() {
// console.log('Calling: startConnectorStatusLogs');
if (!this.initialized) {
this.startConnectorStatusSubscriptions();
await this.initConnectorLogsRealtime();
this.initialized = true;
}
this.triggerLogs$.next([{ type: 'reset' }]);
if (!this.initialized) {
this.startConnectorStatusSubscriptions();
await this.initConnectorLogsRealtime();
this.initialized = true;
}
this.triggerLogs$.next([{ type: 'reset' }]);
}

updateStatusLogs(filter: any) {
Expand All @@ -81,11 +81,11 @@ export class ConnectorStatusService {
if (!this._agentId) {
this._agentId = await this.sharedService.getDynamicMappingServiceAgent();
}
console.log(
'Calling: BrokerConfigurationService.initConnectorLogsRealtime()',
this._agentId
);
const sourceList$ = this.triggerLogs$.pipe(
// console.log(
// 'Calling: BrokerConfigurationService.initConnectorLogsRealtime()',
// this._agentId
// );
const filteredConnectorStatus$ = this.triggerLogs$.pipe(
// tap((x) => console.log('TriggerLogs In', x)),
switchMap(() => {
const filter = {
Expand Down Expand Up @@ -114,11 +114,11 @@ export class ConnectorStatusService {
: event.connectorIdent == this.filterStatusLog.connectorIdent;
})
),
share()
// tap((x) => console.log('TriggerLogs Out', x))
share(),
tap((x) => console.log('TriggerLogs Out', x))
);

const sourceRealtime$ = this.realtimeConnectorStatus$.pipe(
const realtimeConnectorStatusRealtime$ = this.realtimeConnectorStatus$.pipe(
// tap((x) => console.log('IncomingRealtime In', x)),
filter((event) => {
return (
Expand All @@ -138,25 +138,29 @@ export class ConnectorStatusService {
})
);

const o: Observable<any> = merge(
sourceList$,
sourceRealtime$,
// const refreshedConnectorStatus$: Observable<any> =
merge(
filteredConnectorStatus$,
realtimeConnectorStatusRealtime$,
this.triggerLogs$
).pipe(
// tap((i) => console.log('Items', i)),
scan((acc, val) => {
let sortedAcc;
if (val[0]?.type == 'reset') {
// console.log('Reset loaded logs!');
sortedAcc = [];
} else {
sortedAcc = val.concat(acc);
}
sortedAcc = sortedAcc.slice(0, 9);
return sortedAcc;
}, [])
);
o.subscribe((logs) => this.statusLogs$.next(logs));
)
.pipe(
// tap((i) => console.log('Items', i)),
scan((acc, val) => {
let sortedAcc;
if (val[0]?.type == 'reset') {
// console.log('Reset loaded logs!');
sortedAcc = [];
} else {
sortedAcc = val.concat(acc);
}
sortedAcc = sortedAcc.slice(0, 9);
return sortedAcc;
}, []),
tap((logs) => this.statusLogs$.next(logs))
)
.subscribe();
// refreshedConnectorStatus$.subscribe((logs) => this.statusLogs$.next(logs));
}

async startConnectorStatusSubscriptions(): Promise<void> {
Expand All @@ -168,11 +172,11 @@ export class ConnectorStatusService {
// subscribe to event stream
this.subscriptionEvents = this.realtime.subscribe(
`/events/${this._agentId}`,
this.updateConnectorStatus
this.updateRealtimeConnectorStatus
);
}

private updateConnectorStatus = async (p: object) => {
private updateRealtimeConnectorStatus = async (p: object) => {
const payload = p['data']['data'];
this.realtimeConnectorStatus$.next(payload);
};
Expand Down

0 comments on commit eb117e3

Please sign in to comment.