Skip to content

Commit

Permalink
feat(consul): integrate into app, better debug output, ping support (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronkvanmeerten authored Dec 10, 2024
1 parent 6981366 commit 5a68b29
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 19 deletions.
31 changes: 16 additions & 15 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import SanityLoop from './sanity_loop';
import MetricsLoop from './metrics_loop';
import ScalingManager from './scaling_options_manager';
import RedisStore from './redis';
import ConsulStore from './consul';
import PrometheusClient from './prometheus';
import MetricsStore from './metrics_store';
import InstanceStore from './instance_store';
Expand Down Expand Up @@ -92,12 +93,13 @@ switch (config.MetricsStoreProvider) {
let instanceStore: InstanceStore;

switch (config.InstanceStoreProvider) {
// case 'consul':
// instanceStore = new ConsulClient({
// logger,
// endpoint: config.ConsulURL,
// });
// break;
case 'consul':
instanceStore = new ConsulStore({
host: config.ConsulHost,
port: config.ConsulPort,
secure: config.ConsulSecure,
});
break;
default:
// redis
instanceStore = new RedisStore({
Expand All @@ -113,17 +115,16 @@ switch (config.InstanceStoreProvider) {
break;
}

mapp.get('/health', (req: express.Request, res: express.Response) => {
mapp.get('/health', async (req: express.Request, res: express.Response) => {
logger.debug('Health check');
if (req.query['deep']) {
redisClient.ping((err, reply) => {
if (err) {
res.status(500).send('unhealthy');
} else {
logger.debug('Redis ping reply', { reply });
res.send('deeply healthy');
}
});
const reply = await instanceStore.ping(req.context);
if (!reply) {
res.status(500).send('unhealthy');
} else {
logger.debug('instance store ping reply', { reply });
res.send('deeply healthy');
}
} else {
res.send('healthy!');
}
Expand Down
6 changes: 6 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ if (result.error) {
const env = cleanEnv(process.env, {
PORT: num({ default: 3000 }),
LOG_LEVEL: str({ default: 'info' }),
CONSUL_HOST: str({ default: 'localhost' }),
CONSUL_PORT: num({ default: 8500 }),
CONSUL_SECURE: bool({ default: false }),
REDIS_HOST: str({ default: '127.0.0.1' }),
REDIS_PORT: num({ default: 6379 }),
REDIS_PASSWORD: str({ default: '' }),
Expand Down Expand Up @@ -108,6 +111,9 @@ groupList.forEach((group) => {
export default {
HTTPServerPort: env.PORT,
LogLevel: env.LOG_LEVEL,
ConsulHost: env.CONSUL_HOST,
ConsulPort: env.CONSUL_PORT,
ConsulSecure: env.CONSUL_SECURE,
RedisHost: env.REDIS_HOST,
RedisPort: env.REDIS_PORT,
RedisPassword: env.REDIS_PASSWORD,
Expand Down
20 changes: 16 additions & 4 deletions src/consul.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ export default class ConsulStore implements InstanceStore {

async getAllInstanceGroups(ctx: Context): Promise<InstanceGroup[]> {
ctx.logger.debug('fetching consul k/v keys');
const res = await this.client.kv.get({ key: this.groupsPrefix, recurse: true });
ctx.logger.debug('received consul k/v keys', { res });
const key = this.groupsPrefix;
const res = await this.client.kv.get({ key, recurse: true });
if (!res) {
ctx.logger.debug('received consul k/v results', { key });
return [];
}
ctx.logger.debug('received consul k/v results', { key, res });
return Object.entries(res).map(([_k, v]) => <InstanceGroup>JSON.parse(v.Value));
}

Expand Down Expand Up @@ -336,7 +338,7 @@ export default class ConsulStore implements InstanceStore {
async fetch(ctx: Context, key: string): Promise<GetItem | undefined> {
ctx.logger.debug(`reading consul k/v key`, { key });
const v = await this.client.kv.get(key);
ctx.logger.debug(`received consul k/v item`, { v });
ctx.logger.debug(`received consul k/v item`, { key, v });
return v;
}

Expand Down Expand Up @@ -365,7 +367,7 @@ export default class ConsulStore implements InstanceStore {
// the value is considered expired if the timestamp is in the past
async checkValue(ctx: Context, key: string): Promise<boolean> {
try {
const res = this.fetchTTLValue(ctx, key);
const res = this.fetchTTLValue(ctx, this.valuesPrefix + key);
if (!res) {
return false;
}
Expand Down Expand Up @@ -395,4 +397,14 @@ export default class ConsulStore implements InstanceStore {
await this.client.kv.del(key);
return true;
}

async ping(ctx: Context): Promise<boolean | string> {
try {
await this.client.status.leader();
return true;
} catch (err) {
ctx.logger.error(`Failed to ping consul: ${err}`, { err });
return err;
}
}
}
3 changes: 3 additions & 0 deletions src/instance_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ export interface InstanceStore {

// sanity related
saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };

// health
ping: { (ctx: Context): Promise<boolean | string> };
}

export default InstanceStore;
13 changes: 13 additions & 0 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,17 @@ export default class RedisStore implements MetricsStore, InstanceStore {
);
return true;
}

async ping(ctx: Context): Promise<boolean | string> {
return await new Promise((resolve) => {
this.redisClient.ping((err, reply) => {
if (err) {
ctx.logger.error('Redis ping error', { err });
resolve(false);
} else {
resolve(reply);
}
});
});
}
}

0 comments on commit 5a68b29

Please sign in to comment.