-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathworker.js
executable file
·87 lines (72 loc) · 2.83 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* Copyright: The PastVu contributors.
* GNU Affero General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/agpl.txt)
*/
import ms from 'ms';
import moment from 'moment';
import log4js from 'log4js';
import config from './config';
import connectDb, { waitDb, syncAllIndexes } from './controllers/connection';
import { checkPendingMigrations } from './controllers/migration';
import { archiveExpiredSessions, calcUserStats } from './controllers/_session';
import { convertPhotosAll } from './controllers/converter';
import { clusterPhotosAll } from './controllers/cluster';
import { calcRegionStats } from './controllers/region';
import { createQueue } from './controllers/queue';
import './controllers/systemjs';
const logger = log4js.getLogger('worker');
export async function configure(startStamp) {
logger.info('Application Hash: ' + config.hash);
// Perform migration.
await checkPendingMigrations(true);
await connectDb({
redis: config.redis,
mongo: { uri: config.mongo.connection, poolSize: config.mongo.pool },
logger,
});
moment.locale(config.lang); // Set global language for momentjs
logger.info(`Worker started up in ${(Date.now() - startStamp) / 1000}s`);
waitDb.then(() => {
logger.info('Syncing indexes defined in model schemas to MongoDB.');
return syncAllIndexes();
}).then(() => {
setupSessionQueue();
setupUserJobsQueue();
});
}
/**
* Setup queue for session jobs.
*/
function setupSessionQueue() {
createQueue('session').then(sessionQueue => {
// session.archiveExpiredSessions
sessionQueue.process('archiveExpiredSessions', job => archiveExpiredSessions(job.data));
// session.calcUserStats
sessionQueue.process('calcUserStats', job => calcUserStats(job.data));
// Add archiveExpiredSessions periodic job.
sessionQueue.add('archiveExpiredSessions', {}, {
removeOnComplete: 2, // Needed to be able to retrieve it on global event listener (in different runner).
removeOnFail: true,
repeat: { every: ms('5m') },
});
// Add calcUserStatsJob periodic job.
sessionQueue.add('calcUserStats', {}, {
removeOnComplete: 2,
removeOnFail: true,
repeat: { every: ms('1d') },
});
});
}
/**
* Setup queue for user jobs (non-regular).
*/
function setupUserJobsQueue() {
createQueue('userjobs').then(userJobsQueue => {
// converter.convertPhotosAll
userJobsQueue.process('convertPhotosAll', job => convertPhotosAll(job.data));
// cluster.clusterPhotosAll
userJobsQueue.process('clusterPhotosAll', job => clusterPhotosAll(job.data));
// region.calcRegionStats
userJobsQueue.process('calcRegionStats', job => calcRegionStats(job.data));
});
}