From 5a68b293fab1079322d8b99585686f12d483538b Mon Sep 17 00:00:00 2001 From: Aaron van Meerten Date: Mon, 9 Dec 2024 18:09:38 -0600 Subject: [PATCH] feat(consul): integrate into app, better debug output, ping support (#164) --- src/app.ts | 31 ++++++++++++++++--------------- src/config.ts | 6 ++++++ src/consul.ts | 20 ++++++++++++++++---- src/instance_store.ts | 3 +++ src/redis.ts | 13 +++++++++++++ 5 files changed, 54 insertions(+), 19 deletions(-) diff --git a/src/app.ts b/src/app.ts index 559bef2..9769ee3 100644 --- a/src/app.ts +++ b/src/app.ts @@ -26,6 +26,7 @@ import SanityLoop from './sanity_loop'; import MetricsLoop from './metrics_loop'; import ScalingManager from './scaling_options_manager'; import RedisStore from './redis'; +import ConsulStore from './consul'; import PrometheusClient from './prometheus'; import MetricsStore from './metrics_store'; import InstanceStore from './instance_store'; @@ -92,12 +93,13 @@ switch (config.MetricsStoreProvider) { let instanceStore: InstanceStore; switch (config.InstanceStoreProvider) { - // case 'consul': - // instanceStore = new ConsulClient({ - // logger, - // endpoint: config.ConsulURL, - // }); - // break; + case 'consul': + instanceStore = new ConsulStore({ + host: config.ConsulHost, + port: config.ConsulPort, + secure: config.ConsulSecure, + }); + break; default: // redis instanceStore = new RedisStore({ @@ -113,17 +115,16 @@ switch (config.InstanceStoreProvider) { break; } -mapp.get('/health', (req: express.Request, res: express.Response) => { +mapp.get('/health', async (req: express.Request, res: express.Response) => { logger.debug('Health check'); if (req.query['deep']) { - redisClient.ping((err, reply) => { - if (err) { - res.status(500).send('unhealthy'); - } else { - logger.debug('Redis ping reply', { reply }); - res.send('deeply healthy'); - } - }); + const reply = await instanceStore.ping(req.context); + if (!reply) { + res.status(500).send('unhealthy'); + } else { + logger.debug('instance store ping reply', { reply }); + res.send('deeply healthy'); + } } else { res.send('healthy!'); } diff --git a/src/config.ts b/src/config.ts index b09ac10..d2de6a4 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,9 @@ if (result.error) { const env = cleanEnv(process.env, { PORT: num({ default: 3000 }), LOG_LEVEL: str({ default: 'info' }), + CONSUL_HOST: str({ default: 'localhost' }), + CONSUL_PORT: num({ default: 8500 }), + CONSUL_SECURE: bool({ default: false }), REDIS_HOST: str({ default: '127.0.0.1' }), REDIS_PORT: num({ default: 6379 }), REDIS_PASSWORD: str({ default: '' }), @@ -108,6 +111,9 @@ groupList.forEach((group) => { export default { HTTPServerPort: env.PORT, LogLevel: env.LOG_LEVEL, + ConsulHost: env.CONSUL_HOST, + ConsulPort: env.CONSUL_PORT, + ConsulSecure: env.CONSUL_SECURE, RedisHost: env.REDIS_HOST, RedisPort: env.REDIS_PORT, RedisPassword: env.REDIS_PASSWORD, diff --git a/src/consul.ts b/src/consul.ts index c9f783e..b9a386f 100644 --- a/src/consul.ts +++ b/src/consul.ts @@ -225,11 +225,13 @@ export default class ConsulStore implements InstanceStore { async getAllInstanceGroups(ctx: Context): Promise { ctx.logger.debug('fetching consul k/v keys'); - const res = await this.client.kv.get({ key: this.groupsPrefix, recurse: true }); - ctx.logger.debug('received consul k/v keys', { res }); + const key = this.groupsPrefix; + const res = await this.client.kv.get({ key, recurse: true }); if (!res) { + ctx.logger.debug('received consul k/v results', { key }); return []; } + ctx.logger.debug('received consul k/v results', { key, res }); return Object.entries(res).map(([_k, v]) => JSON.parse(v.Value)); } @@ -336,7 +338,7 @@ export default class ConsulStore implements InstanceStore { async fetch(ctx: Context, key: string): Promise { ctx.logger.debug(`reading consul k/v key`, { key }); const v = await this.client.kv.get(key); - ctx.logger.debug(`received consul k/v item`, { v }); + ctx.logger.debug(`received consul k/v item`, { key, v }); return v; } @@ -365,7 +367,7 @@ export default class ConsulStore implements InstanceStore { // the value is considered expired if the timestamp is in the past async checkValue(ctx: Context, key: string): Promise { try { - const res = this.fetchTTLValue(ctx, key); + const res = this.fetchTTLValue(ctx, this.valuesPrefix + key); if (!res) { return false; } @@ -395,4 +397,14 @@ export default class ConsulStore implements InstanceStore { await this.client.kv.del(key); return true; } + + async ping(ctx: Context): Promise { + try { + await this.client.status.leader(); + return true; + } catch (err) { + ctx.logger.error(`Failed to ping consul: ${err}`, { err }); + return err; + } + } } diff --git a/src/instance_store.ts b/src/instance_store.ts index 4f75ef0..ef43f68 100644 --- a/src/instance_store.ts +++ b/src/instance_store.ts @@ -177,6 +177,9 @@ export interface InstanceStore { // sanity related saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise }; + + // health + ping: { (ctx: Context): Promise }; } export default InstanceStore; diff --git a/src/redis.ts b/src/redis.ts index 80c41b1..15a0a5f 100644 --- a/src/redis.ts +++ b/src/redis.ts @@ -536,4 +536,17 @@ export default class RedisStore implements MetricsStore, InstanceStore { ); return true; } + + async ping(ctx: Context): Promise { + return await new Promise((resolve) => { + this.redisClient.ping((err, reply) => { + if (err) { + ctx.logger.error('Redis ping error', { err }); + resolve(false); + } else { + resolve(reply); + } + }); + }); + } }