-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheroUnion.mjs
637 lines (528 loc) · 22.4 KB
/
heroUnion.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
/**
* Hero管理、调度
* --使用流程--
* 1. 本地启动machete_hero爬虫,它会主动连接本联盟,并加入到爬虫队列等待处理任务(爬虫会定时上报自己的状态给联盟);
* 2. 联盟收到新任务时,存入待处理队列,等待在线的爬虫来获取;
* 3. 爬虫获取到新任务处理完成后,将结果回传给联盟;
* 4. 联盟收到爬虫处理结果触发回调通知并将数据结果发送给任务提交者;
* 5. 任务提交者可自行根据任务编号来联盟查询任务结果;
*
* --并发处理规则--
* 同一个任务可以被分配给多个爬虫
* 同一个任务可以接收不同爬虫回传的数据,并完成回调
*
* --数据缓存规则--
* 任务结果数据最大不超过1M,超过的当任务处理失败处理
* 任务数据保存最长 1 天
*
* --异常处理规则--
* 任务处理超时后将进行中的任务状态改为等待中,以便其它爬虫处理
* 任务处理超过最多尝试次数,则标记为失败
*/
import fs from 'node:fs';
import { readdir, readFile } from 'node:fs/promises';
import path from 'node:path';
import cron from 'node-cron';
import axios from 'axios';
import common from './common.mjs';
import md5 from 'md5';
class HeroUnion {
//构造函数,设置默认配置
constructor(configFilename) {
this.config = null;
this.configFile = typeof(configFilename) != 'undefined' && configFilename ? configFilename : 'config.json';
//默认配置
this.systemLogDir = 'log/'; //系统日志保存目录
this.task_cache_time = 86400; //任务数据最长缓存时间,单位:秒
this.task_data_max_size = 1024; //任务数据最大字节数,单位:KB,默认最大1M
this.task_timeout = 600; //任务处理超时时长,单位:秒
this.task_max_try = 5; //任务处理最多尝试次数
this.notify_timeout = 8; //回调通知请求超时时长,单位:秒
this.notify_max_try = 5; //回调通知最多尝试次数
this.heroHeartTimeout = 600; //爬虫心跳超时时长,单位:秒
this.max_list_hero_num = 1000; //在接口getHeros()里最多返回的爬虫数量
this.axios_proxy = false; //axios库发送请求时是否使用系统代理
this.stats = {
start_time: common.getTimestampInSeconds()
};
this.heros = []; //hero爬虫队列
this.tasks = []; //任务队列
//任务相关数据
this.taskStatus = {
'total': 0,
'waiting': 0,
'running': 0,
'done': 0,
'failed': 0
};
//任务通知回调相关数据
this.taskNotifyStatus = {
'total': 0,
'done': 0,
'failed': 0
};
this.statusCode = {
'waiting': '待处理',
'running': '处理中',
'done': '完成',
'failed': '失败'
};
this.supportedPlatforms = {
'douyin': true,
'kuaishou': true,
'xigua': true,
'bilibili': true
};
//爬虫相关数据
this.heroStatus = {
'total': 0,
'idle': 0, //空闲
'busy': 0, //繁忙
'offline': 0 //离线
};
}
isDataTooLarge(data) {
return common.byteSize(JSON.stringify(data)) > this.task_data_max_size * 1024;
}
async getConfig(forceReload) {
const _self = this;
if ( !this.config || (typeof(forceReload) != 'undefined' && forceReload) ) {
common.log("Load config from %s", this.configFile);
let config = await common.getConfigFromJsonFile(this.configFile);
//覆盖默认配置
for (const key in config) {
if (typeof(_self[key]) != 'undefined') {
_self[key] = config[key];
}
}
this.config = config;
}
return this.config;
}
//--任务相关功能--
//根据任务提交者ID和时间戳生成任务ID编号
generateTaskId(uuid) {
let timestamp = common.getTimestamp();
return `${uuid}_${timestamp}`;
}
//根据当前时间生成任务的密钥
generateTaskToken(id) {
let timestamp = common.getTimestamp();
return md5(`${id}_${timestamp}`);
}
isSupportedPlatform(platform) {
return typeof(this.supportedPlatforms[platform]) != 'undefined' && this.supportedPlatforms[platform];
}
//提交新任务
/**
* {
* id: '',
* status: '',
* uuid: '',
* country: '',
* lang: '',
* url: '',
* platform: '', //目标网址所属平台,具体参考爬虫所支持的平台
* contract: '', //需要抓取的数据合约,凡是支持此合约的爬虫将根据合约内容抓取数据
* data_mode: '', //json, html
* notify_url: '',
* results: [],
* created: 0, //timestamp in seconds
* updated: 0, //timestamp in seconds
* error: '',
* notified: false, //是否成功发送回调通知
* notify_time: 0, //回调通知次数
* try_time: 0, //任务处理次数
* token: '' //任务密钥,爬虫完成任务回传数据的时候用它签名
* }
**/
async createTask(uuid, url, platform, contract, data_mode, notify_url, country, lang) {
let timestamp = common.getTimestampInSeconds();
let task = {
id: this.generateTaskId(uuid),
status: 'waiting',
notified: false,
notify_time: 0,
try_time: 0,
//必选
uuid: uuid,
url: url,
platform: platform,
contract: contract,
//可选
data_mode: 'json',
country: 'cn',
lang: 'zh',
notify_url: '',
results: [],
created: timestamp,
updated: timestamp
};
if (typeof(data_mode) != 'undefined' && data_mode) {
task.data_mode = data_mode;
}
if (typeof(notify_url) != 'undefined' && notify_url) {
task.notify_url = notify_url;
}
if (typeof(country) != 'undefined' && country) {
task.country = country;
}
if (typeof(lang) != 'undefined' && lang) {
task.lang = lang;
}
this.tasks.push(task);
this.taskStatus.total ++;
this.taskStatus.waiting ++;
//保存任务日志
let config = await this.getConfig();
let logFile = path.resolve(config.systemLogDir) + '/tasks.log';
common.saveLog(logFile, JSON.stringify(task) + "\n");
common.log('Task %s created, url %s, notify url %s.', task.id, url, notify_url);
return task;
}
//参数均可选,获取 1 个待处理的任务
getWaitingTask(platforms, contracts, country, lang, data_mode) {
let searchResult = null;
let taskIndex = this.tasks.findIndex(function(item) {
if (item.status != 'waiting') {return false;}
if (typeof(platforms) != 'undefined' && platforms && platforms.indexOf(item.platform) == -1) {
return false;
}
if (typeof(contracts) != 'undefined' && contracts && contracts.indexOf(item.contract) == -1) {
return false;
}
if (typeof(country) != 'undefined' && country && item.country != country) {
return false;
}
if (typeof(lang) != 'undefined' && lang && item.lang != lang) {
return false;
}
if (typeof(data_mode) != 'undefined' && data_mode && item.data_mode != data_mode) {
return false;
}
return true;
});
if (taskIndex > -1) {
this.tasks[taskIndex].status = 'running';
//为task生成一个随机密钥,便于爬虫处理完成后回传的时候对数据进行签名
this.tasks[taskIndex].token = this.generateTaskToken(this.tasks[taskIndex].id);
//任务处理次数计数
this.tasks[taskIndex].try_time ++;
//更新任务修改时间
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
//更新统计数据
this.taskStatus.waiting --;
this.taskStatus.running ++;
searchResult = this.tasks[taskIndex];
}
return searchResult;
}
//保存处理中任务结果
//增加失败状态设置
saveTaskById(bot_name, id, data, status) {
let done = false;
let taskIndex = this.tasks.findIndex((item) => item.id == id && item.status == 'running');
if (taskIndex > -1) {
if (this.isDataTooLarge(data)) {
//更新统计数据
this.taskStatus.running --;
this.taskStatus.failed ++;
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = 'Result is too large to save.';
common.error('Task %s save data failed by bot %s, data is too large.', id, bot_name);
return false;
}
data.provider = bot_name; //记录数据提供者
this.tasks[taskIndex].results = data;
this.tasks[taskIndex].updated = common.getTimestampInSeconds();
//更新统计数据
this.taskStatus.running --;
if (typeof(status) == 'undefined' || status == 'done') {
this.taskStatus.done ++;
this.tasks[taskIndex].status = 'done';
}else if (typeof(status) != 'undefined' && status == 'failed') {
this.taskStatus.failed ++;
this.tasks[taskIndex].status = 'failed';
this.tasks[taskIndex].error = typeof(data.error) != 'undefined' && data.error ?
data.error : 'HeroBot says it failed.';
common.error('Task %s is failed, save by bot %s', id, bot_name);
}
common.log('Task %s save data done by bot %s.', id, bot_name);
done = true;
}
return done;
}
//查询某个任务的状态及其数据
getTaskById(id) {
return this.tasks.find((item) => item.id == id);
}
//根据uuid获取用户的签名密钥
async getUserToken(uuid) {
let config = await this.getConfig();
return config && typeof(config.tokens[uuid]) != 'undefined' ? config.tokens[uuid] : '';
}
//任务完成触发回调通知
async handleTaskDone(task) {
let notified = false;
let notify_url = task.notify_url;
try {
common.log('[%s] Try to notify task %s via %s', task.notify_time, task.id, notify_url);
let params = {
"task_id": task.id,
"task_result": task.results,
"timestamp": common.getTimestamp(),
};
let token = await this.getUserToken(task.uuid);
params.sign = common.sign(params, token);
const response = await axios.post(notify_url, params, {
timeout: this.notify_timeout*1000,
proxy: this.axios_proxy
});
if (response.status == 200) {
notified = true;
common.log('Task %s done by %s, notify to %s done, response data:', task.id, task.results.provider, notify_url, response.data);
}else {
common.error('[FAILED] Notify to %s failed, response status: %s, status text: %s, result: %s',
notify_url, response.status, response.statusText, response.data);
}
}catch(err) {
common.error('[ERROR] Notify to %s failed: %s', notify_url, err);
}
//更新任务notified状态以及notify_time通知次数
let taskIndex = this.tasks.findIndex((item) => item.id == task.id);
if (taskIndex > -1) {
this.tasks[taskIndex].notified = notified;
this.tasks[taskIndex].notify_time ++;
//更新任务通知状态数据
if (notified) {
this.taskNotifyStatus.done ++;
this.taskNotifyStatus.total = this.taskNotifyStatus.done + this.taskNotifyStatus.failed;
}else if (!notified && this.tasks[taskIndex].notify_time == this.notify_max_try) {
this.taskNotifyStatus.failed ++;
this.taskNotifyStatus.total = this.taskNotifyStatus.done + this.taskNotifyStatus.failed;
common.error('[FAILED] Finally failed after %s try, notify to %s', this.notify_max_try, notify_url);
}
}
return notified;
}
//--爬虫相关功能--
//接收爬虫状态上报
/**
* bot爬虫属性
* name
* description
* status: [idle, busy]
* platforms: [], //支持的平台,可由爬虫定义
* contracts: [], //支持的数据抓取合约,具体内容由爬虫定义
* timestamp
* country
* lang
* contact
*/
heroOnboard(bot) {
let cachedBotIndex = this.heros.findIndex((item) => item.name == bot.name),
cachedBot = cachedBotIndex > -1 ? this.heros[cachedBotIndex] : null;
if (cachedBot) { //如果是已经存在的爬虫
if (cachedBot.status != bot.status) {
common.log('Hero %s status change from %s to %s', cachedBot.name, cachedBot.status, bot.status);
this.heroStatus[cachedBot.status] --;
this.heroStatus[bot.status] ++;
}
this.heros[cachedBotIndex] = bot; //数据更新
common.log('Hero %s is %s at %s', bot.name, bot.status, bot.timestamp);
}else {
this.heros.push(bot); //添加新爬虫
this.heroStatus.total ++;
if (bot.status == 'idle') {
this.heroStatus.idle ++;
}else {
this.heroStatus.busy ++;
}
common.log('Hero %s is onboard at %s', bot.name, bot.timestamp);
}
}
//定期检查爬虫是否在线
//如果上一次上报状态时间在10分钟前,则设置该爬虫已下线
heroHeartCheck() {
const _self = this;
const frequence = typeof(this.config.heroHeartCheckFrequence) != 'undefined'
&& this.config.heroHeartCheckFrequence ? this.config.heroHeartCheckFrequence : 1; //1 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
_self.heros.forEach(function(item, index) {
if (item.status != 'offline' && timestamp - item.timestamp > _self.heroHeartTimeout) {
_self.heroStatus[item.status] --;
_self.heros[index].status = 'offline';
_self.heroStatus.offline ++;
common.log('Hero %s is offline, last heart beat at %s', item.name, item.timestamp);
}
});
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of hero heart check started.');
}
//自动重新加载配置文件
autoReloadConfigs() {
const _self = this;
const frequence = typeof(this.config.reloadConfigFrequence) != 'undefined'
&& this.config.reloadConfigFrequence ? this.config.reloadConfigFrequence : 5; //5 分钟重新加载一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
const forceReload = true;
_self.getConfig(forceReload);
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of config auto reload started.');
}
//定期清理过期的任务
autoCleanExpiredTasks() {
const _self = this;
const frequence = typeof(this.config.autoCleanTaskFrequence) != 'undefined'
&& this.config.autoCleanTaskFrequence ? this.config.autoCleanTaskFrequence : 10; //10 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
let tasksLeft = _self.tasks.reduce(function(accumulator, item) {
if (
(item.status == 'done' || item.status == 'failed')
&& timestamp - item.created > _self.task_cache_time
) {
if (_self.taskStatus[item.status] >= 1) {
_self.taskStatus[item.status] --;
if (_self.taskStatus.total >= 1) {
_self.taskStatus.total --;
}
}
let notify_status = item.notified ? 'done' : 'failed';
if (_self.taskNotifyStatus[notify_status] >= 1) {
_self.taskNotifyStatus[notify_status] --;
_self.taskNotifyStatus.total = _self.taskNotifyStatus.done + _self.taskNotifyStatus.failed;
}
common.log('Task %s is expired, which is created at %s', item.id, item.created);
}else {
accumulator.push(item);
}
return accumulator;
}, []);
if (tasksLeft) {
_self.tasks = tasksLeft;
}
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of auto clean expired tasks started.');
}
//定期重置处理过期的任务
autoResetRunningTimeoutTasks() {
const _self = this;
const frequence = typeof(this.config.autoResetWaitingTaskFrequence) != 'undefined'
&& this.config.autoResetWaitingTaskFrequence ? this.config.autoResetWaitingTaskFrequence : 6; //6 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
let timestamp = common.getTimestampInSeconds();
_self.tasks.forEach(function(item, index) {
if (
item.status == 'running'
&& item.try_time < _self.task_max_try
&& timestamp - item.updated > _self.task_timeout
) {
_self.taskStatus.running --;
_self.taskStatus.waiting ++;
_self.tasks[index].status = 'waiting';
common.log('Task %s running timeout, and reset it to waiting list, url: %s', item.id, item.url);
}else if (item.status == 'running' && item.try_time >= _self.task_max_try) {
//设置任务失败
_self.taskStatus.running --;
_self.taskStatus.failed ++;
_self.tasks[index].status = 'failed';
_self.tasks[index].error = 'Task max try time got.';
common.error('Task %s failed, got the max try time, url: %s.', item.id, item.url);
}
});
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of auto reset expired running tasks started.');
}
//定期尝试给已完成状态的任务notify_url发送通知回调
//bug fix:忽略没有notify_url的任务
autoNotifyTasks() {
const _self = this;
const frequence = typeof(this.config.autoNotifyTaskFrequence) != 'undefined'
&& this.config.autoNotifyTaskFrequence ? this.config.autoNotifyTaskFrequence : 2; //2 分钟检查一次
const cronjob = cron.schedule(`*/${frequence} * * * *`, () => {
let task = _self.tasks.find((item) => common.isUrlOk(item.notify_url) &&
item.status == 'done' &&
item.notified == false &&
item.notify_time < _self.notify_max_try
);
if (task) {
_self.handleTaskDone(task);
}
}, {
scheduled: false
});
cronjob.start();
common.log('Cronjob of auto notify done tasks started.');
}
//获取联盟状态
getStats() {
this.stats.taskStatus = this.taskStatus;
this.stats.taskNotifyStatus = this.taskNotifyStatus;
this.stats.heroStatus = this.heroStatus;
this.stats.run_seconds = common.getTimestampInSeconds() - this.stats.start_time;
this.stats.cache_time = this.task_cache_time;
return this.stats;
}
//获取爬虫列表
getHeros(page, limit) {
if (typeof(page) == 'undefined') {
page = 1;
}
if (typeof(limit) == 'undefined') {
limit = 20;
}
if (page < 1) {
page = 1;
}
if (limit > 100) {
limit = 100;
}
let start = (page - 1)*limit,
end = start + limit;
if (start >= this.heros.length) {
return [];
}
if (end > this.heros.length) {
end = this.heros.length;
}else if (end > this.max_list_hero_num) {
end = this.max_list_hero_num;
}
//根据心跳时间从新到旧排序
this.heros.sort(function(itemA, itemB) {
if (itemA.timestamp > itemB.timestamp) {
return -1;
}else if (itemA.timestamp < itemB.timestamp){
return 1;
}
return 0;
});
return this.heros.slice(start, end);
}
getHeroByName(bot_name) {
return this.heros.find((item) => item.name == bot_name);
}
//初始化
async init() {
await this.getConfig();
this.autoReloadConfigs();
this.heroHeartCheck();
this.autoCleanExpiredTasks();
this.autoNotifyTasks();
this.autoResetRunningTimeoutTasks();
}
}
export default HeroUnion;