Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental monitor mode to BasicCrawler #2692

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
60 changes: 43 additions & 17 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,62 @@ import { dirname } from 'node:path';

import type { Log } from '@apify/log';
import defaultLog, { LogLevel } from '@apify/log';
import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout';
import { TimeoutError, addTimeoutToPromise, tryCancel } from '@apify/timeout';
import { cryptoRandomObjectId } from '@apify/utilities';
import type {
AddRequestsBatchedOptions,
AddRequestsBatchedResult,
AutoscaledPoolOptions,
CrawlingContext,
DatasetExportOptions,
EnqueueLinksOptions,
EventManager,
DatasetExportOptions,
FinalStatistics,
GetUserDataFromRequest,
IRequestList,
LoadedContext,
ProxyInfo,
Request,
RequestOptions,
RestrictedCrawlingContext,
RouterHandler,
RouterRoutes,
Session,
SessionPoolOptions,
Source,
StatisticState,
StatisticsOptions,
LoadedContext,
RestrictedCrawlingContext,
} from '@crawlee/core';
import {
AutoscaledPool,
Configuration,
CriticalError,
Dataset,
enqueueLinks,
EnqueueStrategy,
EventType,
KeyValueStore,
mergeCookies,
Monitor,
NonRetryableError,
purgeDefaultStorages,
RequestProvider,
RequestQueueV1,
RequestQueue,
RequestQueueV1,
RequestState,
RetryRequestError,
Router,
SessionError,
SessionPool,
Statistics,
enqueueLinks,
mergeCookies,
purgeDefaultStorages,
validators,
} from '@crawlee/core';
import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types';
import { ROTATE_PROXY_ERRORS, gotScraping } from '@crawlee/utils';
import { stringify } from 'csv-stringify/sync';
import { ensureDir, writeFile, writeJSON } from 'fs-extra';
// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types, so its alllll gooooood
import type { OptionsInit, Method } from 'got-scraping';
import type { Method, OptionsInit } from 'got-scraping';
import ow, { ArgumentError } from 'ow';
import { getDomain } from 'tldts';
import type { SetRequired } from 'type-fest';
Expand Down Expand Up @@ -351,6 +352,12 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
* whether to output them to the Key-Value store.
*/
statisticsOptions?: StatisticsOptions;

/**
* Track and display time estimation and concurrency status in the CLI output at regular intervals.
* @default false
*/
monitor?: boolean;
}

/**
Expand Down Expand Up @@ -499,6 +506,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected retryOnBlocked: boolean;
private _closeEvents?: boolean;

private monitor?: boolean;
private experiments: CrawlerExperiments;
private _experimentWarnings: Partial<Record<keyof CrawlerExperiments, boolean>> = {};

Expand Down Expand Up @@ -542,6 +550,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
experiments: ow.optional.object,

statisticsOptions: ow.optional.object,
monitor: ow.optional.boolean,
};

/**
Expand Down Expand Up @@ -592,6 +601,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
statusMessageCallback,

statisticsOptions,
monitor,
} = options;

this.requestList = requestList;
Expand All @@ -601,6 +611,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.statusMessageCallback = statusMessageCallback as StatusMessageCallback;
this.events = config.getEventManager();
this.domainAccessedTime = new Map();
this.monitor = monitor;
this.experiments = experiments;

this._handlePropertyNameChange({
Expand Down Expand Up @@ -754,7 +765,10 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
log,
};

this.autoscaledPoolOptions = { ...autoscaledPoolOptions, ...basicCrawlerAutoscaledPoolConfiguration };
this.autoscaledPoolOptions = {
...autoscaledPoolOptions,
...basicCrawlerAutoscaledPoolConfiguration,
};
}

/**
Expand Down Expand Up @@ -904,11 +918,15 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.events.on(EventType.MIGRATING, boundPauseOnMigration);
this.events.on(EventType.ABORTING, boundPauseOnMigration);

const monitor = this.monitor ? new Monitor(this.stats, this.autoscaledPool, this.requestQueue) : null;
monitor?.start();

try {
await this.autoscaledPool!.run();
} finally {
await this.teardown();
await this.stats.stopCapturing();
monitor?.stop();

process.off('SIGINT', sigintHandler);
this.events.off(EventType.MIGRATING, boundPauseOnMigration);
Expand Down Expand Up @@ -949,9 +967,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

periodicLogger.stop();
await this.setStatusMessage(
`Finished! Total ${this.stats.state.requestsFinished + this.stats.state.requestsFailed} requests: ${
this.stats.state.requestsFinished
} succeeded, ${this.stats.state.requestsFailed} failed.`,
`Finished! Total ${
this.stats.state.requestsFinished + this.stats.state.requestsFailed
} requests: ${this.stats.state.requestsFinished} succeeded, ${this.stats.state.requestsFailed} failed.`,
{ isStatusMessageTerminal: true, level: 'INFO' },
);
this.running = false;
Expand Down Expand Up @@ -1202,7 +1220,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
source['inProgress'].add(request.id!);
}

await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront });
await source.reclaimRequest(request, {
forefront: request.userData?.__crawlee?.forefront,
});
}, delay);

return true;
Expand Down Expand Up @@ -1466,7 +1486,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
retryCount,
});

await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront });
await source.reclaimRequest(request, {
forefront: request.userData?.__crawlee?.forefront,
});
return;
}
}
Expand Down Expand Up @@ -1495,7 +1517,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
try {
return (await cb()) as T;
} catch (e: any) {
Object.defineProperty(e, 'triggeredFromUserHandler', { value: true });
Object.defineProperty(e, 'triggeredFromUserHandler', {
value: true,
});
throw e;
}
}
Expand Down Expand Up @@ -1704,7 +1728,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
return baseUrl.hostname === loadedBaseUrl.hostname;
}
case EnqueueStrategy.SameDomain: {
const baseUrlHostname = getDomain(baseUrl.hostname, { mixedInputs: false });
const baseUrlHostname = getDomain(baseUrl.hostname, {
mixedInputs: false,
});

if (baseUrlHostname) {
const loadedBaseUrlHostname = getDomain(loadedBaseUrl.hostname, { mixedInputs: false });
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export * from './session_pool';
export * from './storages';
export * from './validators';
export * from './cookie_utils';
export * from './monitor';
export { PseudoUrl } from '@apify/pseudo_url';
export { Dictionary, Awaitable, Constructor, StorageClient, Cookie, QueueOperationInfo } from '@crawlee/types';
142 changes: 142 additions & 0 deletions packages/core/src/monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os from 'os';

import type { AutoscaledPool, RequestProvider, Statistics } from '.';

export class Monitor {
private statistics: Statistics;
private autoscaledPool: AutoscaledPool | undefined;
private requestQueue: RequestProvider | undefined;

private intervalId: NodeJS.Timeout | null = null;
private monitorDisplay: MonitorDisplay | null = null;

constructor(
statistics: Statistics,
autoscaledPool: AutoscaledPool | undefined,
requestQueue: RequestProvider | undefined,
) {
this.statistics = statistics;
this.autoscaledPool = autoscaledPool;
this.requestQueue = requestQueue;
}

start(interval: number = 500) {
if (!this.monitorDisplay) {
this.monitorDisplay = new MonitorDisplay();
}

this.intervalId = setInterval(async () => {
await this.display();
}, interval);
}

stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}

private async display() {
const stats = this.statistics.calculate();
const now = new Date();
const startTime = this.statistics.state.crawlerStartedAt;
const elapsedTime = now.getTime() - new Date(startTime!).getTime();
const cpuLoad = os.loadavg()[0];
const memLoad = (os.totalmem() - os.freemem()) / os.totalmem();
const { requestsFinished } = this.statistics.state;
const assumedTotalCount = this.requestQueue?.assumedTotalCount ?? 0;

if (!this.monitorDisplay) {
throw new Error('Start the monitor first');
}

this.monitorDisplay.log(`Start: ${startTime ? formatDateTime(new Date(startTime)) : undefined}`);
this.monitorDisplay.log(`Now: ${formatDateTime(now)} (running for ${elapsedTime / 1000}s)`);
this.monitorDisplay.log(
`Progress: ${requestsFinished} / ${assumedTotalCount} (${((requestsFinished / assumedTotalCount) * 100).toFixed(2)}%), failed: ${this.statistics.state.requestsFailed} (${((this.statistics.state.requestsFailed / assumedTotalCount) * 100).toFixed(2)}%)`,
);
this.monitorDisplay.log(
`Remaining: ${this.estimateRemainingTime(stats)} seconds (${(stats.requestsFinishedPerMinute / 60).toFixed(2)} pages/seconds)`,
);
this.monitorDisplay.log(`Sys. load: ${cpuLoad.toFixed(2)}% CPU / ${(memLoad * 100).toFixed(2)}% Memory`);
this.monitorDisplay.log(
`Concurrencies: Current ${this.autoscaledPool?.currentConcurrency}, Desired ${this.autoscaledPool?.desiredConcurrency}`,
);

// TODO: Add list of URLs that are currently being processed

this.monitorDisplay.resetCursor();
}

private estimateRemainingTime(stats: ReturnType<Statistics['calculate']>) {
const na = 'N/A';
if (!this.requestQueue) {
return na;
}

const remainingRequests = this.requestQueue.assumedTotalCount - this.statistics.state.requestsFinished;
const avgDuration = stats.requestAvgFinishedDurationMillis;
const remainingTime = (remainingRequests * avgDuration) / 1000;
const safeRemainingTime = Number.isFinite(remainingTime) ? remainingTime.toFixed(2) : na;
return safeRemainingTime;
}
}

const CLEAR_LINE = '\x1B[K';

class MonitorDisplay {
private lastLinesCount: number = 0;
private linesCount: number = 0;

public log(str: string): void {
// We create an empty line at the start so that any console.log calls
// from within the script are above our output.
if (this.linesCount === 0) {
// eslint-disable-next-line no-console
console.log(CLEAR_LINE); // erases the current line
this.linesCount += 1;
}

// Strip lines that are too long
// const strToLog = str.substring(0, 78);
const strToLog = str;
// eslint-disable-next-line no-console
console.log(`${CLEAR_LINE}${strToLog}`);
this.linesCount += 1;
}

public resetCursor(): void {
// move cursor up to draw over out output
process.stdout.write(`\x1B[${this.linesCount}A`);
this.lastLinesCount = this.linesCount;
this.linesCount = 0;
}

public close(): void {
// move cursor down so that console output stays
process.stdout.write(`\x1B[${this.lastLinesCount}B`);
}
}

function formatDateTime(datetime: Date | number): string {
const date = typeof datetime === 'number' ? new Date(datetime) : datetime;

const dateStr = `${date.getFullYear()}-${padDate(date.getMonth() + 1, 2)}-${padDate(date.getDate(), 2)}`;
const timeStr =
`${padDate(date.getHours(), 2)}` +
`:${padDate(date.getMinutes(), 2)}` +
`:${padDate(date.getSeconds(), 2)}` +
`.${padDate(date.getMilliseconds(), 3)}`;

return `${dateStr} ${timeStr}`;
}

function padDate(value: number | string, num: number): string {
const str = value.toString();
if (str.length >= num) {
return str;
}
const zeroesToAdd = num - str.length;
return '0'.repeat(zeroesToAdd) + str;
}
Loading