Skip to content

Commit

Permalink
JobServer with oplog watcher to communicate with main countly process
Browse files Browse the repository at this point in the history
  • Loading branch information
kanwarujjaval committed Jan 14, 2025
1 parent 967fa24 commit 7b4dc94
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
1 change: 1 addition & 0 deletions api/config.sample.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var countlyConfig = {
db: "countly",
port: 27017,
max_pool_size: 500,
replicaName: "rs0",
//username: test,
//password: test,
//mongos: false,
Expand Down
64 changes: 64 additions & 0 deletions jobServer/JobServer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const JobManager = require('./JobManager');
const JobScanner = require('./JobScanner');

const JOBS_CONFIG_COLLECTION = 'jobConfigs';
/**
* Class representing a job process.
*/
Expand Down Expand Up @@ -53,6 +54,20 @@ class JobServer {
*/
#db = null;

/**
* Collection for job configurations
* @private
* @type {import('mongodb').Collection}
*/
#jobConfigsCollection;

/**
* Flag indicating whether the job process is shutting down
* avoids multiple shutdown race conditions
* @private
*/
#isShuttingDown = false;

/**
* Creates a new JobProcess instance.
* @param {Object} common Countly common
Expand Down Expand Up @@ -90,7 +105,13 @@ class JobServer {
this.#jobManager = new JobManager(this.#db, this.Logger);
this.#jobScanner = new JobScanner(this.#db, this.Logger, this.#pluginManager);

this.#jobConfigsCollection = this.#db.collection(JOBS_CONFIG_COLLECTION);
// await this.#jobConfigsCollection.createIndex({ jobName: 1 }, /*{ unique: true }*/);

this.#setupSignalHandlers();
// Watch for changes in job configurations
this.#watchJobConfigs();

this.#log.i('Job process init successfully');
}
catch (error) {
Expand Down Expand Up @@ -155,12 +176,55 @@ class JobServer {
});
}

/**
* Watch for changes in job configurations
* @private
*/
async #watchJobConfigs() {
const changeStream = this.#jobConfigsCollection.watch();

changeStream.on('change', async(change) => {
try {
if (change.operationType === 'update' || change.operationType === 'insert') {
const jobName = change.fullDocument.jobName;
const enabled = change.fullDocument.enabled;

if (enabled) {
await this.#jobManager.enableJob(jobName);
}
else {
await this.#jobManager.disableJob(jobName);
}

this.#log.i(`Job ${jobName} ${enabled ? 'enabled' : 'disabled'}`);
}
}
catch (error) {
this.#log.e('Error processing job config change:', error);
}
});

changeStream.on('error', (error) => {
this.#log.e('Error in job configs change stream:', error);
// Implement reconnection logic here
});
}

/**
* Shuts down the job process.
* @param {number} [exitCode=0] - The exit code to use when shutting down the process.
* @returns {Promise<void>} A promise that resolves once the job process is shut down.
*/
async #shutdown(exitCode = 0) {
if (this.#isShuttingDown) {
return;
}
this.#isShuttingDown = true;

if (this.#db && typeof this.#db.close === 'function') {
await this.#db.close();
}

if (!this.#isRunning) {
this.#log.w('Shutdown called but process is not running');
process.exit(exitCode);
Expand Down

0 comments on commit 7b4dc94

Please sign in to comment.