Skip to content

Commit

Permalink
change initializing realtime updates: ConnectorConfigurationService, …
Browse files Browse the repository at this point in the history
…ConnectorStatusService
  • Loading branch information
ck-c8y committed Sep 9, 2024
1 parent 5a14de3 commit 29a38db
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 96 deletions.
47 changes: 2 additions & 45 deletions dynamic-mapping-ui/src/monitoring/chart/chart.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export class MonitoringChartComponent implements OnInit, OnDestroy {
mappingStatus$: Subject<MappingStatus[]> = new Subject<MappingStatus[]>();
echartOptions: EChartsOption;
echartUpdateOptions: EChartsOption;
subscription: object;
echartsInstance: any;
subscription: object;
textColor: string;
fontFamily: string;
fontWeight: number;
Expand Down Expand Up @@ -124,53 +124,10 @@ export class MonitoringChartComponent implements OnInit, OnDestroy {
}
]
};
this.echartsInstance?.setOption(this.echartUpdateOptions);
// this.echartsInstance?.setOption(this.echartUpdateOptions);
}
});

// const config = {
// type: 'bar' as any,
// data: data,
// options: {
// responsive: true,
// maintainAspectRatio: false,
// layout: {
// padding: {
// left: 0,
// right: 0,
// top: 0,
// bottom: 0
// }
// },
// plugins: {
// legend: {
// display: true,
// position: 'left' as any,
// font: {
// family: this.fontFamily,
// weight: 'normal' as any
// }
// }
// },
// indexAxis: 'y' as any,
// color: this.textColor as any,
// scales: {
// y: {
// ticks: {
// color: this.textColor as any
// }
// },
// x: {
// ticks: {
// color: this.textColor as any,
// stepSize: 0
// }
// }
// }
// }
// };
// this.statusMappingChart = new Chart('monitoringChart', config);

const yAxisData = [
'Errors',
'Messages received',
Expand Down
52 changes: 27 additions & 25 deletions dynamic-mapping-ui/src/shared/connector-configuration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
forkJoin,
from,
Observable,
ReplaySubject,
Subject
} from 'rxjs';
import { map, shareReplay, switchMap, tap } from 'rxjs/operators';
Expand All @@ -50,8 +51,7 @@ export class ConnectorConfigurationService {
private client: FetchClient,
private sharedService: SharedService
) {
this.initConnectorConfigurations();
this.startConnectorStatusSubscriptions();
this.startConnectorConfigurations();
this.realtime = new Realtime(this.client);
}

Expand All @@ -60,9 +60,8 @@ export class ConnectorConfigurationService {

private triggerConfigurations$: Subject<string> = new Subject();
private realtimeConnectorStatus$: Subject<IEvent> = new Subject();
private realtimeConnectorConfigurations$: Observable<
ConnectorConfiguration[]
>;
private realtimeConnectorConfigurations$: Subject<ConnectorConfiguration[]> =
new ReplaySubject(1);
private enrichedConnectorConfiguration$: Observable<ConnectorConfiguration[]>;

private _agentId: string;
Expand Down Expand Up @@ -125,30 +124,33 @@ export class ConnectorConfigurationService {
}
});
return configurations;
}),
shareReplay(1)
})
// shareReplay(1)
);
this.realtimeConnectorConfigurations$ = combineLatest([
combineLatest([
this.enrichedConnectorConfiguration$,
this.realtimeConnectorStatus$
]).pipe(
map((vars) => {
const [configurations, payload] = vars;
if (payload?.type == StatusEventTypes.STATUS_CONNECTOR_EVENT_TYPE) {
const statusLog: ConnectorStatusEvent = payload[CONNECTOR_FRAGMENT];
configurations.forEach((cc) => {
if (statusLog['connectorIdent'] == cc.ident) {
if (!cc['status$']) {
cc['status$'] = new BehaviorSubject<string>(statusLog.status);
} else {
cc['status$'].next(statusLog.status);
])
.pipe(
map((vars) => {
const [configurations, payload] = vars;
if (payload?.type == StatusEventTypes.STATUS_CONNECTOR_EVENT_TYPE) {
const statusLog: ConnectorStatusEvent = payload[CONNECTOR_FRAGMENT];
configurations.forEach((cc) => {
if (statusLog['connectorIdent'] == cc.ident) {
if (!cc['status$']) {
cc['status$'] = new BehaviorSubject<string>(statusLog.status);
} else {
cc['status$'].next(statusLog.status);
}
}
}
});
}
return configurations;
})
);
});
}
return configurations;
}),
tap((confs) => this.realtimeConnectorConfigurations$.next(confs))
)
.subscribe();
}

async getConnectorSpecifications(): Promise<ConnectorSpecification[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ import {
Pagination
} from '@c8y/ngx-components';
import { BsModalRef, BsModalService } from 'ngx-bootstrap/modal';
import { BehaviorSubject, from, Observable, Subject } from 'rxjs';
import {
BehaviorSubject,
from,
Observable,
ReplaySubject,
Subject
} from 'rxjs';

import * as _ from 'lodash';
import { ConfirmationModalComponent } from '../confirmation/confirmation-modal.component';
Expand All @@ -66,9 +72,7 @@ import { CheckedRendererComponent } from './checked-renderer.component';
styleUrls: ['./connector-grid.component.style.css'],
templateUrl: 'connector-grid.component.html'
})
export class ConnectorConfigurationComponent
implements OnInit, OnDestroy, AfterViewInit
{
export class ConnectorConfigurationComponent implements OnInit, AfterViewInit {
@Input() selectable = true;
@Input() readOnly = false;
@Input() deploy: string[];
Expand All @@ -88,7 +92,7 @@ export class ConnectorConfigurationComponent
monitoring$: Observable<ConnectorStatus>;
specifications: ConnectorSpecification[] = [];
configurations: ConnectorConfiguration[];
configurations$: Subject<ConnectorConfiguration[]> = new Subject();
configurations$: Subject<ConnectorConfiguration[]> = new ReplaySubject(1);
StatusEventTypes = StatusEventTypes;
pagination: Pagination = {
pageSize: 30,
Expand Down Expand Up @@ -218,7 +222,9 @@ export class ConnectorConfigurationComponent

this.connectorConfigurationService
.getRealtimeConnectorConfigurations()
.subscribe((confs) => this.configurations$.next(confs));
.subscribe((confs) => {
this.configurations$.next(confs);
});

this.configurations$.subscribe((confs) => {
this.configurations = confs;
Expand All @@ -227,7 +233,6 @@ export class ConnectorConfigurationComponent
(conf) => (conf['checked'] = this.selected.includes(conf.ident))
);
});
this.connectorConfigurationService.startConnectorConfigurations();
}

public onSelectToggle(id: string) {
Expand Down Expand Up @@ -458,8 +463,4 @@ export class ConnectorConfigurationComponent
findNameByIdent(ident: string): string {
return this.configurations?.find((conf) => conf.ident == ident)?.name;
}

ngOnDestroy(): void {
this.connectorConfigurationService.stopConnectorConfigurations();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { ConnectorConfigurationService } from '../connector-configuration.servic
styleUrls: ['./connector-status.component.style.css'],
templateUrl: 'connector-status.component.html'
})
export class ConnectorStatusComponent implements OnInit, OnDestroy {
export class ConnectorStatusComponent implements OnInit {
version: string = packageJson.version;
monitorings$: Observable<ConnectorStatus>;
feature: Feature;
Expand All @@ -66,21 +66,15 @@ export class ConnectorStatusComponent implements OnInit, OnDestroy {
// console.log('Running version', this.version);
this.feature = await this.sharedService.getFeatures();
this.statusLogs$ = this.connectorStatusService.getStatusLogs();
await this.connectorStatusService.startConnectorStatusLogs();

this.connectorConfigurationService
.getRealtimeConnectorConfigurations()
.subscribe((confs) => {
this.configurations = confs;
});
await this.connectorConfigurationService.startConnectorConfigurations();
}

updateStatusLogs() {
this.connectorStatusService.updateStatusLogs(this.filterStatusLog);
}

ngOnDestroy(): void {
this.connectorStatusService.stopConnectorStatusLogs();
}
}
24 changes: 16 additions & 8 deletions dynamic-mapping-ui/src/shared/connector-status.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ import {
SharedService
} from '../shared';

import { merge, Observable, Subject } from 'rxjs';
import { filter, map, scan, share, switchMap, tap } from 'rxjs/operators';
import { merge, Observable, ReplaySubject, Subject } from 'rxjs';
import {
filter,
map,
scan,
share,
shareReplay,
switchMap,
tap
} from 'rxjs/operators';

@Injectable({ providedIn: 'root' })
export class ConnectorStatusService {
Expand All @@ -39,6 +47,7 @@ export class ConnectorStatusService {
private sharedService: SharedService
) {
this.realtime = new Realtime(this.client);
this.startConnectorStatusLogs();
}

private _agentId: string;
Expand All @@ -52,14 +61,14 @@ export class ConnectorStatusService {
};
private triggerLogs$: Subject<any> = new Subject();
private realtimeConnectorStatus$: Subject<IEvent> = new Subject();
private statusLogs$: Subject<any[]> = new Subject();
private statusLogs$: Subject<any[]> = new ReplaySubject(1);

getStatusLogs(): Observable<any[]> {
return this.statusLogs$;
}

async startConnectorStatusLogs() {
// console.log('Calling: startConnectorStatusLogs');
console.log('Calling: startConnectorStatusLogs', this.initialized);
if (!this.initialized) {
this.startConnectorStatusSubscriptions();
await this.initConnectorLogsRealtime();
Expand Down Expand Up @@ -113,9 +122,8 @@ export class ConnectorStatusService {
? true
: event.connectorIdent == this.filterStatusLog.connectorIdent;
})
),
share(),
tap((x) => console.log('TriggerLogs Out', x))
)
// tap((x) => console.log('TriggerLogs Out', x))
);

const realtimeConnectorStatusRealtime$ = this.realtimeConnectorStatus$.pipe(
Expand Down Expand Up @@ -158,9 +166,9 @@ export class ConnectorStatusService {
return sortedAcc;
}, []),
tap((logs) => this.statusLogs$.next(logs))
// shareReplay(1)
)
.subscribe();
// refreshedConnectorStatus$.subscribe((logs) => this.statusLogs$.next(logs));
}

async startConnectorStatusSubscriptions(): Promise<void> {
Expand Down

0 comments on commit 29a38db

Please sign in to comment.