Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prometheus): updates for better testing #162

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ let metricsStore: MetricsStore;
switch (config.MetricsStoreProvider) {
case 'prometheus':
metricsStore = new PrometheusClient({
logger,
endpoint: config.PrometheusURL,
});
break;
Expand Down
2 changes: 1 addition & 1 deletion src/instance_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ interface InstanceStore {
setValue: { (key: string, value: string, ttl: number): Promise<boolean> };

// sanity related
saveCloudInstances: { (groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };
saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };
}

export default InstanceStore;
2 changes: 1 addition & 1 deletion src/metrics_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interface MetricsStore {
(ctx: Context, group: string, item: InstanceMetric): Promise<boolean>;
};
cleanInstanceMetrics: { (ctx: Context, group: string): Promise<boolean> };
saveMetricUnTrackedCount: { (groupName: string, count: number): Promise<boolean> };
saveMetricUnTrackedCount: { (ctx: Context, groupName: string, count: number): Promise<boolean> };
}

export default MetricsStore;
100 changes: 49 additions & 51 deletions src/prometheus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as promClient from 'prom-client';
import { pushMetrics, Result, Options } from 'prometheus-remote-write';
import { PrometheusDriver, QueryResult } from 'prometheus-query';
import { Logger } from 'winston';
import MetricsStore, { InstanceMetric } from './metrics_store';
import { Context } from './context';

Expand All @@ -14,20 +13,17 @@ export interface PromMetrics {
}

export interface PrometheusOptions {
logger: Logger;
endpoint: string;
baseURL?: string;
promDriver?: PrometheusDriver;
promWriter?: PrometheusWriter;
}

interface PromQueryValue {
time: string;
value: number;
}

interface PrometheusWriter {
pushMetrics: (metrics: PromMetrics, options: Options) => Promise<Result>;
}

//metrics for prometheus query
const promQueryErrors = new promClient.Counter({
name: 'autoscaler_prom_query_errors',
Expand Down Expand Up @@ -60,88 +56,95 @@ const promWriteSum = new promClient.Counter({
help: 'Sum of timings for high level prometheus remote write',
});

export class PrometheusWriter {
private url: string;
constructor(url = 'localhost:9090/api/v1/write') {
this.url = url;
}

async pushMetrics(metrics: PromMetrics, labels: PromLabels): Promise<Result> {
const options = <Options>{
url: this.url,
labels,
// verbose: true,
headers: { 'Content-Type': 'application/x-protobuf' },
};
return pushMetrics(metrics, options);
}
}

export default class PrometheusClient implements MetricsStore {
private logger: Logger;
private endpoint: string;
private baseURL = '/api/v1';
private writeURL = '/api/v1/write';

private promDriver: PrometheusDriver;
private promWriter: PrometheusWriter;

constructor(options: PrometheusOptions) {
this.logger = options.logger;
this.endpoint = options.endpoint;
if (options.baseURL) {
this.baseURL = options.baseURL;
}
if (options.promDriver) {
this.promDriver = options.promDriver;
} else {
this.promDriver = new PrometheusDriver({
endpoint: this.endpoint,
baseURL: this.baseURL,
});
}
if (options.promWriter) {
this.promWriter = options.promWriter;
} else {
this.promWriter = new PrometheusWriter(this.endpoint + this.writeURL);
}
}

prometheusDriver(): PrometheusDriver {
return new PrometheusDriver({
endpoint: this.endpoint,
baseURL: this.baseURL,
});
}

prometheusWriter(): PrometheusWriter {
return {
pushMetrics(metrics: PromMetrics, options: Options): Promise<Result> {
return pushMetrics(metrics, options);
},
};
}

public async prometheusRangeQuery(query: string, driver = <PrometheusDriver>{}): Promise<QueryResult> {
if (!driver) driver = this.prometheusDriver();
public async prometheusRangeQuery(ctx: Context, query: string): Promise<QueryResult> {
const start = new Date().getTime() - 1 * 60 * 60 * 1000;
const end = new Date();
const step = 60; // 1 point every minute
try {
const qStart = process.hrtime();
const res = await driver.rangeQuery(query, start, end, step);
const res = await this.promDriver.rangeQuery(query, start, end, step);
const qEnd = process.hrtime(qStart);
promQueryCount.inc();
promQuerySum.inc(qEnd[0] * 1000 + qEnd[1] / 1000000);

return res;
} catch (err) {
promQueryErrors.inc();
this.logger.error('Error querying Prometheus:', { query, err });
ctx.logger.error('Error querying Prometheus:', { query, err });
}
}

async pushMetric(metrics: PromMetrics, labels: PromLabels, writer: PrometheusWriter): Promise<boolean> {
if (!writer) writer = this.prometheusWriter();
const pushUrl = this.endpoint + this.writeURL;
async pushMetric(ctx: Context, metrics: PromMetrics, labels: PromLabels): Promise<boolean> {
try {
const options = {
url: pushUrl,
labels,
// verbose: true,
headers: { 'Content-Type': 'application/x-protobuf' },
};
const pushStart = process.hrtime();
const res = await writer.pushMetrics(metrics, options);
const res = await this.promWriter.pushMetrics(metrics, labels);
const pushEnd = process.hrtime(pushStart);

if (res.status !== 204) {
promWriteErrors.inc();
this.logger.error('Returned status != 204 while pushing metrics to Prometheus:', res);
ctx.logger.error('Returned status != 204 while pushing metrics to Prometheus:', res);
} else {
promWriteCount.inc();
promWriteSum.inc(pushEnd[0] * 1000 + pushEnd[1] / 1000000);
return true;
}
} catch (err) {
promWriteErrors.inc();
this.logger.error('Error pushing metrics to Prometheus:', err);
ctx.logger.error('Error pushing metrics to Prometheus:', err);
}
return false;
}

async fetchInstanceMetrics(ctx: Context, group: string, driver = <PrometheusDriver>{}): Promise<InstanceMetric[]> {
async fetchInstanceMetrics(ctx: Context, group: string): Promise<InstanceMetric[]> {
const query = `autoscaler_instance_stress_level{group="${group}"}`;
const metricItems: InstanceMetric[] = [];
try {
const res = await this.prometheusRangeQuery(query, driver);
const res = await this.prometheusRangeQuery(ctx, query);
res.result.forEach((promItem) => {
promItem.values.forEach((v: PromQueryValue) => {
metricItems.push(<InstanceMetric>{
Expand All @@ -152,26 +155,21 @@ export default class PrometheusClient implements MetricsStore {
});
});
} catch (err) {
this.logger.error('Error fetching instance metrics:', { group, err });
ctx.logger.error('Error fetching instance metrics:', { group, err });
}
return metricItems;
}

async writeInstanceMetric(
ctx: Context,
group: string,
item: InstanceMetric,
writer = <PrometheusWriter>{},
): Promise<boolean> {
async writeInstanceMetric(ctx: Context, group: string, item: InstanceMetric): Promise<boolean> {
const labels = { instance: item.instanceId, group };
const metrics = { autoscaler_instance_stress_level: item.value };
return this.pushMetric(metrics, labels, writer);
return this.pushMetric(ctx, metrics, labels);
}

saveMetricUnTrackedCount(groupName: string, count: number, writer = <PrometheusWriter>{}): Promise<boolean> {
saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
const metrics = { autoscaler_untracked_instance_count: count };
const labels = { group: groupName };
return this.pushMetric(metrics, labels, writer);
return this.pushMetric(ctx, metrics, labels);
}

async cleanInstanceMetrics(_ctx: Context, _group: string): Promise<boolean> {
Expand Down
5 changes: 3 additions & 2 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,17 @@ export default class RedisStore implements MetricsStore, InstanceStore {
return true;
}

async saveMetricUnTrackedCount(groupName: string, count: number): Promise<boolean> {
async saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
const key = `service-metrics:${groupName}:untracked-count`;
const result = await this.redisClient.set(key, JSON.stringify(count), 'EX', this.serviceLevelMetricsTTL);
if (result !== 'OK') {
ctx.logger.error('Error saving untracked count', { key, count });
throw new Error(`unable to set ${key}`);
}
return true;
}

async saveCloudInstances(groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> {
async saveCloudInstances(_ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> {
await this.redisClient.set(
`cloud-instances-list:${groupName}`,
JSON.stringify(cloudInstances),
Expand Down
12 changes: 6 additions & 6 deletions src/sanity_loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ export default class SanityLoop {
} ms`,
);
ctx.logger.debug(`Retrieved ${group.cloud} instance details for ${groupName}`, { cloudInstances });
await this.saveCloudInstances(group.name, cloudInstances);
await this.saveCloudInstances(ctx, group.name, cloudInstances);

const groupReportStart = process.hrtime();
const groupReport = await this.groupReportGenerator.generateReport(ctx, group, cloudInstances);
const groupReportEnd = process.hrtime(groupReportStart);
ctx.logger.info(`Retrieved group report in ${groupReportEnd[0] * 1000 + groupReportEnd[1] / 1000000} ms`);

await this.saveMetricUnTrackedCount(groupName, groupReport.unTrackedCount);
await this.saveMetricUnTrackedCount(ctx, groupName, groupReport.unTrackedCount);
ctx.logger.info(
`Successfully saved cloud instances and untracked count ${groupReport.unTrackedCount} for ${groupName}`,
);
Expand All @@ -65,11 +65,11 @@ export default class SanityLoop {
}
}

async saveMetricUnTrackedCount(groupName: string, count: number): Promise<boolean> {
return this.metricsStore.saveMetricUnTrackedCount(groupName, count);
async saveMetricUnTrackedCount(ctx: Context, groupName: string, count: number): Promise<boolean> {
return this.metricsStore.saveMetricUnTrackedCount(ctx, groupName, count);
}

private async saveCloudInstances(groupName: string, cloudInstances: CloudInstance[]) {
return this.instanceStore.saveCloudInstances(groupName, cloudInstances);
private async saveCloudInstances(ctx: Context, groupName: string, cloudInstances: CloudInstance[]) {
return this.instanceStore.saveCloudInstances(ctx, groupName, cloudInstances);
}
}
41 changes: 11 additions & 30 deletions src/test/instance_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import assert from 'node:assert';
import test, { afterEach, describe, mock } from 'node:test';

import { InstanceTracker } from '../instance_tracker';
import RedisStore from '../redis';
import { mockStore } from './mock_store';

describe('InstanceTracker', () => {
let context = {
Expand All @@ -17,17 +17,6 @@ describe('InstanceTracker', () => {
},
};

const redisClient = {
expire: mock.fn(),
zremrangebyscore: mock.fn(() => 0),
hgetall: mock.fn(),
hset: mock.fn(),
hdel: mock.fn(),
del: mock.fn(),
scan: mock.fn(),
zrange: mock.fn(),
};

const shutdownManager = {
shutdown: mock.fn(),
};
Expand Down Expand Up @@ -62,19 +51,11 @@ describe('InstanceTracker', () => {
},
};

const redisStore = new RedisStore({ redisClient });

const instanceTracker = new InstanceTracker({
instanceStore: redisStore,
metricsStore: redisStore,
redisScanCount: 100,
instanceStore: mockStore,
metricsStore: mockStore,
shutdownManager,
audit,
idleTTL: 300,
metricTTL: 3600,
provisioningTTL: 900,
shutdownStatusTTL: 86400,
groupRelatedDataTTL: 172800,
});

afterEach(() => {
Expand Down Expand Up @@ -112,7 +93,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -138,7 +119,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -162,7 +143,7 @@ describe('InstanceTracker', () => {
hfGroupDetails.scalingOptions.scaleDownPeriodsCount,
hfGroupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);

const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
Expand All @@ -179,10 +160,10 @@ describe('InstanceTracker', () => {
});

assert.deepEqual(
context.logger.info.mock.calls[1].arguments[0],
context.logger.info.mock.calls[0].arguments[0],
`Filling in for missing metric from previous period`,
);
assert.deepEqual(context.logger.info.mock.calls[1].arguments[1], {
assert.deepEqual(context.logger.info.mock.calls[0].arguments[1], {
group: groupName,
instanceId: 'i-0a1b2c3d4e5f6g7h8',
periodIdx: 0,
Expand All @@ -206,7 +187,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleDownPeriodsCount,
groupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down Expand Up @@ -236,7 +217,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleDownPeriodsCount,
groupDetails.scalingOptions.scaleUpPeriodsCount,
);
redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down Expand Up @@ -266,7 +247,7 @@ describe('InstanceTracker', () => {
groupDetails.scalingOptions.scaleUpPeriodsCount,
);

redisClient.zrange.mock.mockImplementationOnce(() => metricInventory.map(JSON.stringify));
mockStore.fetchInstanceMetrics.mock.mockImplementationOnce(() => metricInventory);
const metricInventoryPerPeriod = await instanceTracker.getMetricInventoryPerPeriod(
context,
groupDetails,
Expand Down
Loading
Loading