From 41f1149192b88de58fa9bc51df9df80e63819ae1 Mon Sep 17 00:00:00 2001 From: Imamuzzaki Abu Salam Date: Wed, 2 Oct 2024 12:50:02 +0700 Subject: [PATCH 1/6] Add experimental monitor mode to BasicCrawler Fixes #2680 Add a new Monitor class to track and display time estimation and concurrency status in the CLI output at regular intervals. * **Monitor Class**: - Add `Monitor` class in `packages/core/src/monitor.ts`. - Include logic to write into the output and gather and calculate the monitor data. * **BasicCrawler Integration**: - Import `Monitor` class in `packages/basic-crawler/src/internals/basic-crawler.ts`. - Initialize and start the `Monitor` class in the `run` function. - Ensure monitor output and `log` output are written on separate lines. - Add `monitor` option to `BasicCrawlerOptions` interface. --- For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/apify/crawlee/issues/2680?shareId=XXXX-XXXX-XXXX-XXXX). --- .../src/internals/basic-crawler.ts | 12 +++++ packages/core/src/monitor.ts | 51 +++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 packages/core/src/monitor.ts diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 4355321581d6..7d53f10dbadc 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -60,6 +60,7 @@ import type { OptionsInit, Method } from 'got-scraping'; import ow, { ArgumentError } from 'ow'; import { getDomain } from 'tldts'; import type { SetRequired } from 'type-fest'; +import { Monitor } from '@crawlee/core/src/monitor'; export interface BasicCrawlingContext extends CrawlingContext { @@ -367,6 +368,13 @@ export interface CrawlerExperiments { * - set `requestLocking` to `false` in the `experiments` option of the crawler */ requestLocking?: boolean; + /** + * Experimental cli output monitor mode + * If you encounter issues due to this change, please: + * - report it to us: https://github.com/apify/crawlee + * - set `requestLocking` to `false` in the `experiments` option of the crawler + */ + monitor?: boolean; } /** @@ -904,11 +912,15 @@ export class BasicCrawler { + this.display(); + }, interval); + } + + stop() { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + } + } + + private display() { + const stats = this.statistics.calculate(); + const now = new Date(); + const startTime = this.statistics.state.crawlerStartedAt; + const elapsedTime = now.getTime() - new Date(startTime!).getTime(); + const cpuLoad = os.loadavg()[0]; + const memLoad = (os.totalmem() - os.freemem()) / os.totalmem(); + + this.log.info(` +Start: ${startTime} +Now: ${now} (running for ${elapsedTime / 1000}s) +Progress: ${this.statistics.state.requestsFinished} / ${stats.requestsTotal} (${(this.statistics.state.requestsFinished / stats.requestsTotal) * 100}%), failed: ${this.statistics.state.requestsFailed} (${(this.statistics.state.requestsFailed / stats.requestsTotal) * 100}%) +Remaining: ${this.estimateRemainingTime(stats)} (${stats.requestsFinishedPerMinute} req/min) +Sys. load: ${cpuLoad.toFixed(2)} / ${(memLoad * 100).toFixed(2)}% +Concurrencies: ${this.statistics.state.requestsRetries} +`); + } + + private estimateRemainingTime(stats: ReturnType) { + const remainingRequests = stats.requestsTotal - this.statistics.state.requestsFinished; + const avgDuration = stats.requestAvgFinishedDurationMillis; + return (remainingRequests * avgDuration) / 1000; + } +} From c7c6a8b32042b6e1d4b8cc853480d4c67114ee07 Mon Sep 17 00:00:00 2001 From: Imamuzzaki Abu Salam Date: Wed, 2 Oct 2024 08:17:40 +0000 Subject: [PATCH 2/6] refactor: Refactor Monitor class to improve code readability and maintainability --- packages/core/src/monitor.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/core/src/monitor.ts b/packages/core/src/monitor.ts index 1fc5b8035f7a..101e042771b5 100644 --- a/packages/core/src/monitor.ts +++ b/packages/core/src/monitor.ts @@ -1,7 +1,9 @@ -import { log as defaultLog, Log } from './log'; -import { Statistics } from './crawlers/statistics'; import os from 'os'; +import type { Statistics } from './crawlers/statistics'; +import type { Log } from './log'; +import { log as defaultLog } from './log'; + export class Monitor { private log: Log; private statistics: Statistics; @@ -36,7 +38,11 @@ export class Monitor { this.log.info(` Start: ${startTime} Now: ${now} (running for ${elapsedTime / 1000}s) -Progress: ${this.statistics.state.requestsFinished} / ${stats.requestsTotal} (${(this.statistics.state.requestsFinished / stats.requestsTotal) * 100}%), failed: ${this.statistics.state.requestsFailed} (${(this.statistics.state.requestsFailed / stats.requestsTotal) * 100}%) +Progress: ${this.statistics.state.requestsFinished} / ${stats.requestsTotal} (${ + (this.statistics.state.requestsFinished / stats.requestsTotal) * 100 + }%), failed: ${this.statistics.state.requestsFailed} (${ + (this.statistics.state.requestsFailed / stats.requestsTotal) * 100 + }%) Remaining: ${this.estimateRemainingTime(stats)} (${stats.requestsFinishedPerMinute} req/min) Sys. load: ${cpuLoad.toFixed(2)} / ${(memLoad * 100).toFixed(2)}% Concurrencies: ${this.statistics.state.requestsRetries} From 9f669e4d570bfaf694bc93bbfc9f1526ae9d68ac Mon Sep 17 00:00:00 2001 From: Imamuzzaki Abu Salam Date: Wed, 2 Oct 2024 15:43:12 +0700 Subject: [PATCH 3/6] Update packages/basic-crawler/src/internals/basic-crawler.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Martin Adámek --- packages/basic-crawler/src/internals/basic-crawler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 7d53f10dbadc..e625b628e218 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -60,7 +60,7 @@ import type { OptionsInit, Method } from 'got-scraping'; import ow, { ArgumentError } from 'ow'; import { getDomain } from 'tldts'; import type { SetRequired } from 'type-fest'; -import { Monitor } from '@crawlee/core/src/monitor'; +import { Monitor } from '@crawlee/core'; export interface BasicCrawlingContext extends CrawlingContext { From 8d48c6abe2ba34e4ec47d535372b882a1218c434 Mon Sep 17 00:00:00 2001 From: Imamuzzaki Abu Salam Date: Wed, 2 Oct 2024 10:40:01 +0000 Subject: [PATCH 4/6] refactor: Refactor Monitor class to improve code readability and maintainability --- .../src/internals/basic-crawler.ts | 555 ++++++++++++------ 1 file changed, 390 insertions(+), 165 deletions(-) diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 7d53f10dbadc..a9bcaeff812b 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -9,24 +9,24 @@ import type { AddRequestsBatchedResult, AutoscaledPoolOptions, CrawlingContext, + DatasetExportOptions, EnqueueLinksOptions, EventManager, - DatasetExportOptions, FinalStatistics, GetUserDataFromRequest, IRequestList, + LoadedContext, ProxyInfo, Request, RequestOptions, + RestrictedCrawlingContext, RouterHandler, RouterRoutes, Session, SessionPoolOptions, Source, - StatisticState, StatisticsOptions, - LoadedContext, - RestrictedCrawlingContext, + StatisticState, } from '@crawlee/core'; import { AutoscaledPool, @@ -41,8 +41,8 @@ import { NonRetryableError, purgeDefaultStorages, RequestProvider, - RequestQueueV1, RequestQueue, + RequestQueueV1, RequestState, RetryRequestError, Router, @@ -51,16 +51,21 @@ import { Statistics, validators, } from '@crawlee/core'; -import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types'; -import { ROTATE_PROXY_ERRORS, gotScraping } from '@crawlee/utils'; +import { Monitor } from '@crawlee/core/src/monitor'; +import type { + Awaitable, + BatchAddRequestsResult, + Dictionary, + SetStatusMessageOptions, +} from '@crawlee/types'; +import { gotScraping, ROTATE_PROXY_ERRORS } from '@crawlee/utils'; import { stringify } from 'csv-stringify/sync'; import { ensureDir, writeFile, writeJSON } from 'fs-extra'; // @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types, so its alllll gooooood -import type { OptionsInit, Method } from 'got-scraping'; +import type { Method, OptionsInit } from 'got-scraping'; import ow, { ArgumentError } from 'ow'; import { getDomain } from 'tldts'; import type { SetRequired } from 'type-fest'; -import { Monitor } from '@crawlee/core/src/monitor'; export interface BasicCrawlingContext extends CrawlingContext { @@ -87,7 +92,9 @@ export interface BasicCrawlingContext * @param [options] All `enqueueLinks()` parameters are passed via an options object. * @returns Promise that resolves to {@apilink BatchAddRequestsResult} object. */ - enqueueLinks(options?: SetRequired): Promise; + enqueueLinks( + options?: SetRequired + ): Promise; } /** @@ -102,11 +109,15 @@ export interface BasicCrawlingContext const SAFE_MIGRATION_WAIT_MILLIS = 20000; export type RequestHandler< - Context extends CrawlingContext = LoadedContext, + Context extends CrawlingContext = LoadedContext< + BasicCrawlingContext & RestrictedCrawlingContext + >, > = (inputs: LoadedContext) => Awaitable; export type ErrorHandler< - Context extends CrawlingContext = LoadedContext, + Context extends CrawlingContext = LoadedContext< + BasicCrawlingContext & RestrictedCrawlingContext + >, > = (inputs: LoadedContext, error: Error) => Awaitable; export interface StatusMessageCallbackParams< @@ -124,7 +135,9 @@ export type StatusMessageCallback< Crawler extends BasicCrawler = BasicCrawler, > = (params: StatusMessageCallbackParams) => Awaitable; -export interface BasicCrawlerOptions { +export interface BasicCrawlerOptions< + Context extends CrawlingContext = BasicCrawlingContext, +> { /** * User-provided function that performs the logic of the crawler. It is called for each URL to crawl. * @@ -352,6 +365,12 @@ export interface BasicCrawlerOptions { +export class BasicCrawler< + Context extends CrawlingContext = BasicCrawlingContext, +> { protected static readonly CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'; /** @@ -481,7 +495,8 @@ export class BasicCrawler> = Router.create>(); + readonly router: RouterHandler> = + Router.create>(); running = false; hasFinishedBefore = false; @@ -507,8 +522,11 @@ export class BasicCrawler> = {}; + private _experimentWarnings: Partial< + Record + > = {}; protected static optionsShape = { requestList: ow.optional.object.validate(validators.requestList), @@ -542,7 +560,8 @@ export class BasicCrawler = {}, readonly config = Configuration.getGlobalConfig(), ) { - ow(options, 'BasicCrawlerOptions', ow.object.exactShape(BasicCrawler.optionsShape)); + ow( + options, + 'BasicCrawlerOptions', + ow.object.exactShape(BasicCrawler.optionsShape), + ); const { requestList, @@ -606,9 +629,11 @@ export class BasicCrawler (val == null ? null : +val); // allow at least 5min for internal timeouts this.internalTimeoutMillis = - tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? Math.max(this.requestHandlerTimeoutMillis * 2, 300e3); + tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? + Math.max(this.requestHandlerTimeoutMillis * 2, 300e3); // override the default internal timeout of request queue to respect `requestHandlerTimeoutMillis` if (this.requestQueue) { - this.requestQueue.internalTimeoutMillis = this.internalTimeoutMillis; + this.requestQueue.internalTimeoutMillis = + this.internalTimeoutMillis; // for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but // with a minimum of a minute - this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60); + this.requestQueue.requestLockSecs = Math.max( + this.internalTimeoutMillis / 1000 + 5, + 60, + ); } this.maxRequestRetries = maxRequestRetries; @@ -685,7 +717,8 @@ export class BasicCrawler maxRequestsPerCrawl && maxRequestsPerCrawl <= this.handledRequestsCount; + const isMaxPagesExceeded = () => + maxRequestsPerCrawl && + maxRequestsPerCrawl <= this.handledRequestsCount; let { isFinishedFunction } = autoscaledPoolOptions; @@ -717,52 +755,60 @@ export class BasicCrawler false; } - const basicCrawlerAutoscaledPoolConfiguration: Partial = { - minConcurrency: minConcurrency ?? autoscaledPoolOptions?.minConcurrency, - maxConcurrency: maxConcurrency ?? autoscaledPoolOptions?.maxConcurrency, - maxTasksPerMinute: maxRequestsPerMinute ?? autoscaledPoolOptions?.maxTasksPerMinute, - runTaskFunction: this._runTaskFunction.bind(this), - isTaskReadyFunction: async () => { - if (isMaxPagesExceeded()) { - if (shouldLogMaxPagesExceeded) { + const basicCrawlerAutoscaledPoolConfiguration: Partial = + { + minConcurrency: + minConcurrency ?? autoscaledPoolOptions?.minConcurrency, + maxConcurrency: + maxConcurrency ?? autoscaledPoolOptions?.maxConcurrency, + maxTasksPerMinute: + maxRequestsPerMinute ?? + autoscaledPoolOptions?.maxTasksPerMinute, + runTaskFunction: this._runTaskFunction.bind(this), + isTaskReadyFunction: async () => { + if (isMaxPagesExceeded()) { + if (shouldLogMaxPagesExceeded) { + log.info( + 'Crawler reached the maxRequestsPerCrawl limit of ' + + `${maxRequestsPerCrawl} requests and will shut down soon. Requests that are in progress will be allowed to finish.`, + ); + shouldLogMaxPagesExceeded = false; + } + return false; + } + + return this._isTaskReadyFunction(); + }, + isFinishedFunction: async () => { + if (isMaxPagesExceeded()) { log.info( - 'Crawler reached the maxRequestsPerCrawl limit of ' + - `${maxRequestsPerCrawl} requests and will shut down soon. Requests that are in progress will be allowed to finish.`, + `Earlier, the crawler reached the maxRequestsPerCrawl limit of ${maxRequestsPerCrawl} requests ` + + 'and all requests that were in progress at that time have now finished. ' + + `In total, the crawler processed ${this.handledRequestsCount} requests and will shut down.`, ); - shouldLogMaxPagesExceeded = false; + return true; } - return false; - } - return this._isTaskReadyFunction(); - }, - isFinishedFunction: async () => { - if (isMaxPagesExceeded()) { - log.info( - `Earlier, the crawler reached the maxRequestsPerCrawl limit of ${maxRequestsPerCrawl} requests ` + - 'and all requests that were in progress at that time have now finished. ' + - `In total, the crawler processed ${this.handledRequestsCount} requests and will shut down.`, - ); - return true; - } + const isFinished = isFinishedFunction + ? await isFinishedFunction() + : await this._defaultIsFinishedFunction(); - const isFinished = isFinishedFunction - ? await isFinishedFunction() - : await this._defaultIsFinishedFunction(); + if (isFinished) { + const reason = isFinishedFunction + ? "Crawler's custom isFinishedFunction() returned true, the crawler will shut down." + : 'All requests from the queue have been processed, the crawler will shut down.'; + log.info(reason); + } - if (isFinished) { - const reason = isFinishedFunction - ? "Crawler's custom isFinishedFunction() returned true, the crawler will shut down." - : 'All requests from the queue have been processed, the crawler will shut down.'; - log.info(reason); - } + return isFinished; + }, + log, + }; - return isFinished; - }, - log, + this.autoscaledPoolOptions = { + ...autoscaledPoolOptions, + ...basicCrawlerAutoscaledPoolConfiguration, }; - - this.autoscaledPoolOptions = { ...autoscaledPoolOptions, ...basicCrawlerAutoscaledPoolConfiguration }; } /** @@ -772,7 +818,9 @@ export class BasicCrawler (this._getMessageFromError(error) as any)?.includes(x)); + return ROTATE_PROXY_ERRORS.some((x: string) => + (this._getMessageFromError(error) as any)?.includes(x), + ); } /** @@ -780,17 +828,30 @@ export class BasicCrawler { - throw new Error('the "isRequestBlocked" method is not implemented in this crawler.'); + protected async isRequestBlocked( + _crawlingContext: Context, + ): Promise { + throw new Error( + 'the "isRequestBlocked" method is not implemented in this crawler.', + ); } /** * This method is periodically called by the crawler, every `statusMessageLoggingInterval` seconds. */ - async setStatusMessage(message: string, options: SetStatusMessageOptions = {}) { + async setStatusMessage( + message: string, + options: SetStatusMessageOptions = {}, + ) { const data = - options.isStatusMessageTerminal != null ? { terminal: options.isStatusMessageTerminal } : undefined; - this.log.internal(LogLevel[(options.level as 'DEBUG') ?? 'DEBUG'], message, data); + options.isStatusMessageTerminal != null + ? { terminal: options.isStatusMessageTerminal } + : undefined; + this.log.internal( + LogLevel[(options.level as 'DEBUG') ?? 'DEBUG'], + message, + data, + ); const client = this.config.getStorageClient(); @@ -828,13 +889,23 @@ export class BasicCrawler clearInterval(interval) }; } @@ -861,7 +935,10 @@ export class BasicCrawler { + async run( + requests?: (string | Request | RequestOptions)[], + options?: CrawlerRunOptions, + ): Promise { if (this.running) { throw new Error( 'This crawler instance is already running, you can add more requests to it via `crawler.addRequests()`.', @@ -912,7 +989,7 @@ export class BasicCrawler `${count}x: ${info.at(-1)!.trim()} (${info[0]})`; + const prettify = ([count, info]: [number, string[]]) => + `${count}x: ${info.at(-1)!.trim()} (${info[0]})`; this.log.info(`Error analysis:`, { totalErrors: this.stats.errorTracker.total, uniqueErrors: this.stats.errorTracker.getUniqueErrorCount(), - mostCommonErrors: this.stats.errorTracker.getMostPopularErrors(3).map(prettify), + mostCommonErrors: this.stats.errorTracker + .getMostPopularErrors(3) + .map(prettify), }); } @@ -952,7 +1032,9 @@ export class BasicCrawler { if (!finished) { - this.log.info('Waiting for the storage to write its state to file system.'); + this.log.info( + 'Waiting for the storage to write its state to file system.', + ); } }, 1000); await client.teardown(); @@ -961,9 +1043,12 @@ export class BasicCrawler(defaultValue = {} as State): Promise { + async useState( + defaultValue = {} as State, + ): Promise { const kvs = await KeyValueStore.open(null, { config: this.config }); - return kvs.getAutoSavedValue(BasicCrawler.CRAWLEE_STATE_KEY, defaultValue); + return kvs.getAutoSavedValue( + BasicCrawler.CRAWLEE_STATE_KEY, + defaultValue, + ); } /** @@ -1011,7 +1101,10 @@ export class BasicCrawler[0], datasetIdOrName?: string): Promise { + async pushData( + data: Parameters[0], + datasetIdOrName?: string, + ): Promise { const dataset = await this.getDataset(datasetIdOrName); return dataset.pushData(data); } @@ -1026,7 +1119,9 @@ export class BasicCrawler): ReturnType { + async getData( + ...args: Parameters + ): ReturnType { const dataset = await this.getDataset(); return dataset.getData(...args); } @@ -1035,28 +1130,43 @@ export class BasicCrawler(path: string, format?: 'json' | 'csv', options?: DatasetExportOptions): Promise { + async exportData( + path: string, + format?: 'json' | 'csv', + options?: DatasetExportOptions, + ): Promise { const supportedFormats = ['json', 'csv']; if (!format && path.match(/\.(json|csv)$/i)) { - format = path.toLowerCase().match(/\.(json|csv)$/)![1] as 'json' | 'csv'; + format = path.toLowerCase().match(/\.(json|csv)$/)![1] as + | 'json' + | 'csv'; } if (!format) { throw new Error( - `Failed to infer format from the path: '${path}'. Supported formats: ${supportedFormats.join(', ')}`, + `Failed to infer format from the path: '${path}'. Supported formats: ${supportedFormats.join( + ', ', + )}`, ); } if (!supportedFormats.includes(format)) { - throw new Error(`Unsupported format: '${format}'. Use one of ${supportedFormats.join(', ')}`); + throw new Error( + `Unsupported format: '${format}'. Use one of ${supportedFormats.join( + ', ', + )}`, + ); } const dataset = await this.getDataset(); const items = await dataset.export(options); if (format === 'csv') { - const value = stringify([Object.keys(items[0]), ...items.map((item) => Object.values(item))]); + const value = stringify([ + Object.keys(items[0]), + ...items.map((item) => Object.values(item)), + ]); await ensureDir(dirname(path)); await writeFile(path, value); this.log.info(`Export to ${path} finished!`); @@ -1080,10 +1190,16 @@ export class BasicCrawler { + protected async _runRequestHandler( + crawlingContext: Context, + ): Promise { await this.requestHandler(crawlingContext as LoadedContext); } @@ -1102,23 +1220,27 @@ export class BasicCrawler { - if (err.message.includes('running tasks did not finish')) { - this.log.error( - 'The crawler was paused due to migration to another host, ' + - "but some requests did not finish in time. Those requests' results may be duplicated.", - ); - } else { - throw err; - } - }); + await this.autoscaledPool + .pause(SAFE_MIGRATION_WAIT_MILLIS) + .catch((err) => { + if (err.message.includes('running tasks did not finish')) { + this.log.error( + 'The crawler was paused due to migration to another host, ' + + "but some requests did not finish in time. Those requests' results may be duplicated.", + ); + } else { + throw err; + } + }); } const requestListPersistPromise = (async () => { @@ -1142,7 +1264,10 @@ export class BasicCrawler= this.sameDomainDelayMillis) { + if ( + !lastAccessTime || + now - lastAccessTime >= this.sameDomainDelayMillis + ) { this.domainAccessedTime.set(domain, now); return false; } @@ -1207,14 +1338,18 @@ export class BasicCrawler { - this.log.debug(`Adding request ${request.url} (${request.id}) back to the queue`); + this.log.debug( + `Adding request ${request.url} (${request.id}) back to the queue`, + ); if (source instanceof RequestQueueV1) { // eslint-disable-next-line dot-notation source['inProgress'].add(request.id!); } - await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront }); + await source.reclaimRequest(request, { + forefront: request.userData?.__crawlee?.forefront, + }); }, delay); return true; @@ -1225,7 +1360,10 @@ export class BasicCrawler) => { + enqueueLinks: async ( + options: SetRequired, + ) => { return enqueueLinks({ // specify the RQ first to allow overriding it requestQueue: await this.getRequestQueue(), @@ -1285,8 +1429,10 @@ export class BasicCrawler { const cookieJar = session ? { - getCookieString: async (url: string) => session!.getCookieString(url), - setCookie: async (rawCookie: string, url: string) => session!.setCookie(rawCookie, url), + getCookieString: async (url: string) => + session!.getCookieString(url), + setCookie: async (rawCookie: string, url: string) => + session!.setCookie(rawCookie, url), ...overrideOptions?.cookieJar, } : overrideOptions?.cookieJar; @@ -1307,7 +1453,8 @@ export class BasicCrawler KeyValueStore.open(idOrName, { config: this.config }), + getKeyValueStore: async (idOrName?: string) => + KeyValueStore.open(idOrName, { config: this.config }), }; this.crawlingContexts.set(crawlingContext.id, crawlingContext); @@ -1317,13 +1464,17 @@ export class BasicCrawler this._runRequestHandler(crawlingContext), this.requestHandlerTimeoutMillis, - `requestHandler timed out after ${this.requestHandlerTimeoutMillis / 1000} seconds (${request.id}).`, + `requestHandler timed out after ${ + this.requestHandlerTimeoutMillis / 1000 + } seconds (${request.id}).`, ); await this._timeoutAndRetry( async () => source.markRequestHandled(request!), this.internalTimeoutMillis, - `Marking request ${request.url} (${request.id}) as handled timed out after ${ + `Marking request ${request.url} (${ + request.id + }) as handled timed out after ${ this.internalTimeoutMillis / 1e3 } seconds.`, ); @@ -1338,9 +1489,16 @@ export class BasicCrawler this._requestFunctionErrorHandler(err as Error, crawlingContext, source), + async () => + this._requestFunctionErrorHandler( + err as Error, + crawlingContext, + source, + ), this.internalTimeoutMillis, - `Handling request failure of ${request.url} (${request.id}) timed out after ${ + `Handling request failure of ${request.url} (${ + request.id + }) timed out after ${ this.internalTimeoutMillis / 1e3 } seconds.`, ); @@ -1397,8 +1555,18 @@ export class BasicCrawler - this.errorHandler?.(this._augmentContextWithDeprecatedError(crawlingContext, error), error), + this.errorHandler?.( + this._augmentContextWithDeprecatedError( + crawlingContext, + error, + ), + error, + ), ); if (error instanceof SessionError) { @@ -1472,13 +1649,18 @@ export class BasicCrawler { + protected async _handleFailedRequestHandler( + crawlingContext: Context, + error: Error, + ): Promise { // Always log the last error regardless if the user provided a failedRequestHandler const { id, url, method, uniqueKey } = crawlingContext.request; const message = this._getMessageFromError(error, true); - this.log.error(`Request failed and reached maximum retries. ${message}`, { id, url, method, uniqueKey }); + this.log.error( + `Request failed and reached maximum retries. ${message}`, + { id, url, method, uniqueKey }, + ); if (this.failedRequestHandler) { await this._tagUserHandlerError(() => - this.failedRequestHandler?.(this._augmentContextWithDeprecatedError(crawlingContext, error), error), + this.failedRequestHandler?.( + this._augmentContextWithDeprecatedError( + crawlingContext, + error, + ), + error, + ), ); } } @@ -1532,21 +1728,31 @@ export class BasicCrawler error instanceof type)) { + if ( + [TypeError, SyntaxError, ReferenceError].some( + (type) => error instanceof type, + ) + ) { forceStack = true; } - const stackLines = error?.stack ? error.stack.split('\n') : new Error().stack!.split('\n').slice(2); + const stackLines = error?.stack + ? error.stack.split('\n') + : new Error().stack!.split('\n').slice(2); const baseDir = process.cwd(); - const userLine = stackLines.find((line) => line.includes(baseDir) && !line.includes('node_modules')); + const userLine = stackLines.find( + (line) => line.includes(baseDir) && !line.includes('node_modules'), + ); if (error instanceof TimeoutError) { - return process.env.CRAWLEE_VERBOSE_LOG ? error.stack : error.message || error; // stack in timeout errors does not really help + return process.env.CRAWLEE_VERBOSE_LOG + ? error.stack + : error.message || error; // stack in timeout errors does not really help } return process.env.CRAWLEE_VERBOSE_LOG || forceStack - ? (error.stack ?? [error.message || error, ...stackLines].join('\n')) + ? error.stack ?? [error.message || error, ...stackLines].join('\n') : [error.message || error, userLine].join('\n'); } @@ -1555,7 +1761,8 @@ export class BasicCrawler { this.log.deprecated( @@ -1601,10 +1811,9 @@ export class BasicCrawler Awaitable>( - hooks: HookLike[], - ...args: Parameters - ) { + protected async _executeHooks< + HookLike extends (...args: any[]) => Awaitable, + >(hooks: HookLike[], ...args: Parameters) { if (Array.isArray(hooks) && hooks.length) { for (const hook of hooks) { await hook(...args); @@ -1663,7 +1872,10 @@ export class BasicCrawler Date: Wed, 2 Oct 2024 10:44:28 +0000 Subject: [PATCH 5/6] refactor: Refactor Monitor class to improve code readability and maintainability --- .../src/internals/basic-crawler.ts | 589 ++++++------------ 1 file changed, 188 insertions(+), 401 deletions(-) diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 723d8e9b3b05..50f475ca4602 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -5,59 +5,54 @@ import defaultLog, { LogLevel } from '@apify/log'; import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout'; import { cryptoRandomObjectId } from '@apify/utilities'; import type { - AddRequestsBatchedOptions, - AddRequestsBatchedResult, - AutoscaledPoolOptions, - CrawlingContext, - DatasetExportOptions, - EnqueueLinksOptions, - EventManager, - FinalStatistics, - GetUserDataFromRequest, - IRequestList, - LoadedContext, - ProxyInfo, - Request, - RequestOptions, - RestrictedCrawlingContext, - RouterHandler, - RouterRoutes, - Session, - SessionPoolOptions, - Source, - StatisticsOptions, - StatisticState, + AddRequestsBatchedOptions, + AddRequestsBatchedResult, + AutoscaledPoolOptions, + CrawlingContext, + DatasetExportOptions, + EnqueueLinksOptions, + EventManager, + FinalStatistics, + GetUserDataFromRequest, + IRequestList, + LoadedContext, + ProxyInfo, + Request, + RequestOptions, + RestrictedCrawlingContext, + RouterHandler, + RouterRoutes, + Session, + SessionPoolOptions, + Source, + StatisticsOptions, + StatisticState, } from '@crawlee/core'; import { - AutoscaledPool, - Configuration, - CriticalError, - Dataset, - enqueueLinks, - EnqueueStrategy, - EventType, - KeyValueStore, - mergeCookies, - Monitor, - NonRetryableError, - purgeDefaultStorages, - RequestProvider, - RequestQueue, - RequestQueueV1, - RequestState, - RetryRequestError, - Router, - SessionError, - SessionPool, - Statistics, - validators + AutoscaledPool, + Configuration, + CriticalError, + Dataset, + enqueueLinks, + EnqueueStrategy, + EventType, + KeyValueStore, + mergeCookies, + Monitor, + NonRetryableError, + purgeDefaultStorages, + RequestProvider, + RequestQueue, + RequestQueueV1, + RequestState, + RetryRequestError, + Router, + SessionError, + SessionPool, + Statistics, + validators, } from '@crawlee/core'; -import type { - Awaitable, - BatchAddRequestsResult, - Dictionary, - SetStatusMessageOptions, -} from '@crawlee/types'; +import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types'; import { gotScraping, ROTATE_PROXY_ERRORS } from '@crawlee/utils'; import { stringify } from 'csv-stringify/sync'; import { ensureDir, writeFile, writeJSON } from 'fs-extra'; @@ -92,9 +87,7 @@ export interface BasicCrawlingContext * @param [options] All `enqueueLinks()` parameters are passed via an options object. * @returns Promise that resolves to {@apilink BatchAddRequestsResult} object. */ - enqueueLinks( - options?: SetRequired - ): Promise; + enqueueLinks(options?: SetRequired): Promise; } /** @@ -109,15 +102,11 @@ export interface BasicCrawlingContext const SAFE_MIGRATION_WAIT_MILLIS = 20000; export type RequestHandler< - Context extends CrawlingContext = LoadedContext< - BasicCrawlingContext & RestrictedCrawlingContext - >, + Context extends CrawlingContext = LoadedContext, > = (inputs: LoadedContext) => Awaitable; export type ErrorHandler< - Context extends CrawlingContext = LoadedContext< - BasicCrawlingContext & RestrictedCrawlingContext - >, + Context extends CrawlingContext = LoadedContext, > = (inputs: LoadedContext, error: Error) => Awaitable; export interface StatusMessageCallbackParams< @@ -135,9 +124,7 @@ export type StatusMessageCallback< Crawler extends BasicCrawler = BasicCrawler, > = (params: StatusMessageCallbackParams) => Awaitable; -export interface BasicCrawlerOptions< - Context extends CrawlingContext = BasicCrawlingContext, -> { +export interface BasicCrawlerOptions { /** * User-provided function that performs the logic of the crawler. It is called for each URL to crawl. * @@ -453,9 +440,7 @@ export interface CrawlerExperiments { * ``` * @category Crawlers */ -export class BasicCrawler< - Context extends CrawlingContext = BasicCrawlingContext, -> { +export class BasicCrawler { protected static readonly CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'; /** @@ -495,8 +480,7 @@ export class BasicCrawler< * Default {@apilink Router} instance that will be used if we don't specify any {@apilink BasicCrawlerOptions.requestHandler|`requestHandler`}. * See {@apilink Router.addHandler|`router.addHandler()`} and {@apilink Router.addDefaultHandler|`router.addDefaultHandler()`}. */ - readonly router: RouterHandler> = - Router.create>(); + readonly router: RouterHandler> = Router.create>(); running = false; hasFinishedBefore = false; @@ -524,9 +508,7 @@ export class BasicCrawler< private monitor?: boolean; private experiments: CrawlerExperiments; - private _experimentWarnings: Partial< - Record - > = {}; + private _experimentWarnings: Partial> = {}; protected static optionsShape = { requestList: ow.optional.object.validate(validators.requestList), @@ -560,8 +542,7 @@ export class BasicCrawler< // AutoscaledPool shorthands minConcurrency: ow.optional.number, maxConcurrency: ow.optional.number, - maxRequestsPerMinute: - ow.optional.number.integerOrInfinite.positive.greaterThanOrEqual(1), + maxRequestsPerMinute: ow.optional.number.integerOrInfinite.positive.greaterThanOrEqual(1), keepAlive: ow.optional.boolean, // internal @@ -578,11 +559,7 @@ export class BasicCrawler< options: BasicCrawlerOptions = {}, readonly config = Configuration.getGlobalConfig(), ) { - ow( - options, - 'BasicCrawlerOptions', - ow.object.exactShape(BasicCrawler.optionsShape), - ); + ow(options, 'BasicCrawlerOptions', ow.object.exactShape(BasicCrawler.optionsShape)); const { requestList, @@ -629,8 +606,7 @@ export class BasicCrawler< this.requestQueue = requestQueue; this.log = log; this.statusMessageLoggingInterval = statusMessageLoggingInterval; - this.statusMessageCallback = - statusMessageCallback as StatusMessageCallback; + this.statusMessageCallback = statusMessageCallback as StatusMessageCallback; this.events = config.getEventManager(); this.domainAccessedTime = new Map(); this.monitor = monitor; @@ -679,27 +655,20 @@ export class BasicCrawler< oldName: 'handleRequestTimeoutSecs', propertyKey: 'requestHandlerTimeoutMillis', newProperty: newRequestHandlerTimeout, - oldProperty: handleRequestTimeoutSecs - ? handleRequestTimeoutSecs * 1000 - : undefined, + oldProperty: handleRequestTimeoutSecs ? handleRequestTimeoutSecs * 1000 : undefined, }); const tryEnv = (val?: string) => (val == null ? null : +val); // allow at least 5min for internal timeouts this.internalTimeoutMillis = - tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? - Math.max(this.requestHandlerTimeoutMillis * 2, 300e3); + tryEnv(process.env.CRAWLEE_INTERNAL_TIMEOUT) ?? Math.max(this.requestHandlerTimeoutMillis * 2, 300e3); // override the default internal timeout of request queue to respect `requestHandlerTimeoutMillis` if (this.requestQueue) { - this.requestQueue.internalTimeoutMillis = - this.internalTimeoutMillis; + this.requestQueue.internalTimeoutMillis = this.internalTimeoutMillis; // for request queue v2, we want to lock requests by the timeout that would also account for internals (plus 5 seconds padding), but // with a minimum of a minute - this.requestQueue.requestLockSecs = Math.max( - this.internalTimeoutMillis / 1000 + 5, - 60, - ); + this.requestQueue.requestLockSecs = Math.max(this.internalTimeoutMillis / 1000 + 5, 60); } this.maxRequestRetries = maxRequestRetries; @@ -717,8 +686,7 @@ export class BasicCrawler< log, }; if (this.retryOnBlocked) { - this.sessionPoolOptions.blockedStatusCodes = - sessionPoolOptions.blockedStatusCodes ?? []; + this.sessionPoolOptions.blockedStatusCodes = sessionPoolOptions.blockedStatusCodes ?? []; if (this.sessionPoolOptions.blockedStatusCodes.length !== 0) { log.warning( `Both 'blockedStatusCodes' and 'retryOnBlocked' are set. Please note that the 'retryOnBlocked' feature might not work as expected.`, @@ -738,15 +706,10 @@ export class BasicCrawler< this.requestHandlerTimeoutMillis = maxSignedInteger; } - this.internalTimeoutMillis = Math.min( - this.internalTimeoutMillis, - maxSignedInteger, - ); + this.internalTimeoutMillis = Math.min(this.internalTimeoutMillis, maxSignedInteger); let shouldLogMaxPagesExceeded = true; - const isMaxPagesExceeded = () => - maxRequestsPerCrawl && - maxRequestsPerCrawl <= this.handledRequestsCount; + const isMaxPagesExceeded = () => maxRequestsPerCrawl && maxRequestsPerCrawl <= this.handledRequestsCount; let { isFinishedFunction } = autoscaledPoolOptions; @@ -755,55 +718,50 @@ export class BasicCrawler< isFinishedFunction = async () => false; } - const basicCrawlerAutoscaledPoolConfiguration: Partial = - { - minConcurrency: - minConcurrency ?? autoscaledPoolOptions?.minConcurrency, - maxConcurrency: - maxConcurrency ?? autoscaledPoolOptions?.maxConcurrency, - maxTasksPerMinute: - maxRequestsPerMinute ?? - autoscaledPoolOptions?.maxTasksPerMinute, - runTaskFunction: this._runTaskFunction.bind(this), - isTaskReadyFunction: async () => { - if (isMaxPagesExceeded()) { - if (shouldLogMaxPagesExceeded) { - log.info( - 'Crawler reached the maxRequestsPerCrawl limit of ' + - `${maxRequestsPerCrawl} requests and will shut down soon. Requests that are in progress will be allowed to finish.`, - ); - shouldLogMaxPagesExceeded = false; - } - return false; - } - - return this._isTaskReadyFunction(); - }, - isFinishedFunction: async () => { - if (isMaxPagesExceeded()) { + const basicCrawlerAutoscaledPoolConfiguration: Partial = { + minConcurrency: minConcurrency ?? autoscaledPoolOptions?.minConcurrency, + maxConcurrency: maxConcurrency ?? autoscaledPoolOptions?.maxConcurrency, + maxTasksPerMinute: maxRequestsPerMinute ?? autoscaledPoolOptions?.maxTasksPerMinute, + runTaskFunction: this._runTaskFunction.bind(this), + isTaskReadyFunction: async () => { + if (isMaxPagesExceeded()) { + if (shouldLogMaxPagesExceeded) { log.info( - `Earlier, the crawler reached the maxRequestsPerCrawl limit of ${maxRequestsPerCrawl} requests ` + - 'and all requests that were in progress at that time have now finished. ' + - `In total, the crawler processed ${this.handledRequestsCount} requests and will shut down.`, + 'Crawler reached the maxRequestsPerCrawl limit of ' + + `${maxRequestsPerCrawl} requests and will shut down soon. Requests that are in progress will be allowed to finish.`, ); - return true; + shouldLogMaxPagesExceeded = false; } + return false; + } - const isFinished = isFinishedFunction - ? await isFinishedFunction() - : await this._defaultIsFinishedFunction(); + return this._isTaskReadyFunction(); + }, + isFinishedFunction: async () => { + if (isMaxPagesExceeded()) { + log.info( + `Earlier, the crawler reached the maxRequestsPerCrawl limit of ${maxRequestsPerCrawl} requests ` + + 'and all requests that were in progress at that time have now finished. ' + + `In total, the crawler processed ${this.handledRequestsCount} requests and will shut down.`, + ); + return true; + } - if (isFinished) { - const reason = isFinishedFunction - ? "Crawler's custom isFinishedFunction() returned true, the crawler will shut down." - : 'All requests from the queue have been processed, the crawler will shut down.'; - log.info(reason); - } + const isFinished = isFinishedFunction + ? await isFinishedFunction() + : await this._defaultIsFinishedFunction(); - return isFinished; - }, - log, - }; + if (isFinished) { + const reason = isFinishedFunction + ? "Crawler's custom isFinishedFunction() returned true, the crawler will shut down." + : 'All requests from the queue have been processed, the crawler will shut down.'; + log.info(reason); + } + + return isFinished; + }, + log, + }; this.autoscaledPoolOptions = { ...autoscaledPoolOptions, @@ -818,9 +776,7 @@ export class BasicCrawler< * @param error The error to check. */ protected isProxyError(error: Error): boolean { - return ROTATE_PROXY_ERRORS.some((x: string) => - (this._getMessageFromError(error) as any)?.includes(x), - ); + return ROTATE_PROXY_ERRORS.some((x: string) => (this._getMessageFromError(error) as any)?.includes(x)); } /** @@ -828,30 +784,17 @@ export class BasicCrawler< * Returns `false` if the request is not blocked, otherwise returns a string with a description of the block reason. * @param _crawlingContext The crawling context to check. */ - protected async isRequestBlocked( - _crawlingContext: Context, - ): Promise { - throw new Error( - 'the "isRequestBlocked" method is not implemented in this crawler.', - ); + protected async isRequestBlocked(_crawlingContext: Context): Promise { + throw new Error('the "isRequestBlocked" method is not implemented in this crawler.'); } /** * This method is periodically called by the crawler, every `statusMessageLoggingInterval` seconds. */ - async setStatusMessage( - message: string, - options: SetStatusMessageOptions = {}, - ) { + async setStatusMessage(message: string, options: SetStatusMessageOptions = {}) { const data = - options.isStatusMessageTerminal != null - ? { terminal: options.isStatusMessageTerminal } - : undefined; - this.log.internal( - LogLevel[(options.level as 'DEBUG') ?? 'DEBUG'], - message, - data, - ); + options.isStatusMessageTerminal != null ? { terminal: options.isStatusMessageTerminal } : undefined; + this.log.internal(LogLevel[(options.level as 'DEBUG') ?? 'DEBUG'], message, data); const client = this.config.getStorageClient(); @@ -889,23 +832,13 @@ export class BasicCrawler< if (operationMode === 'ERROR') { message = `Experiencing problems, ${ - this.stats.state.requestsFailed - - previousState.requestsFailed || - this.stats.state.requestsFailed - } failed requests in the past ${ - this.statusMessageLoggingInterval - } seconds.`; + this.stats.state.requestsFailed - previousState.requestsFailed || this.stats.state.requestsFailed + } failed requests in the past ${this.statusMessageLoggingInterval} seconds.`; } else { - const total = - this.requestQueue?.getTotalCount() || - this.requestList?.length(); - message = `Crawled ${this.stats.state.requestsFinished}${ - total ? `/${total}` : '' - } pages, ${ + const total = this.requestQueue?.getTotalCount() || this.requestList?.length(); + message = `Crawled ${this.stats.state.requestsFinished}${total ? `/${total}` : ''} pages, ${ this.stats.state.requestsFailed - } failed requests, desired concurrency ${ - this.autoscaledPool?.desiredConcurrency ?? 0 - }.`; + } failed requests, desired concurrency ${this.autoscaledPool?.desiredConcurrency ?? 0}.`; } if (this.statusMessageCallback) { @@ -920,10 +853,7 @@ export class BasicCrawler< await this.setStatusMessage(message); }; - const interval = setInterval( - log, - this.statusMessageLoggingInterval * 1e3, - ); + const interval = setInterval(log, this.statusMessageLoggingInterval * 1e3); return { log, stop: () => clearInterval(interval) }; } @@ -935,10 +865,7 @@ export class BasicCrawler< * @param [requests] The requests to add * @param [options] Options for the request queue */ - async run( - requests?: (string | Request | RequestOptions)[], - options?: CrawlerRunOptions, - ): Promise { + async run(requests?: (string | Request | RequestOptions)[], options?: CrawlerRunOptions): Promise { if (this.running) { throw new Error( 'This crawler instance is already running, you can add more requests to it via `crawler.addRequests()`.', @@ -1014,15 +941,12 @@ export class BasicCrawler< this.log.info('Final request statistics:', stats); if (this.stats.errorTracker.total !== 0) { - const prettify = ([count, info]: [number, string[]]) => - `${count}x: ${info.at(-1)!.trim()} (${info[0]})`; + const prettify = ([count, info]: [number, string[]]) => `${count}x: ${info.at(-1)!.trim()} (${info[0]})`; this.log.info(`Error analysis:`, { totalErrors: this.stats.errorTracker.total, uniqueErrors: this.stats.errorTracker.getUniqueErrorCount(), - mostCommonErrors: this.stats.errorTracker - .getMostPopularErrors(3) - .map(prettify), + mostCommonErrors: this.stats.errorTracker.getMostPopularErrors(3).map(prettify), }); } @@ -1032,9 +956,7 @@ export class BasicCrawler< let finished = false; setTimeout(() => { if (!finished) { - this.log.info( - 'Waiting for the storage to write its state to file system.', - ); + this.log.info('Waiting for the storage to write its state to file system.'); } }, 1000); await client.teardown(); @@ -1044,11 +966,8 @@ export class BasicCrawler< periodicLogger.stop(); await this.setStatusMessage( `Finished! Total ${ - this.stats.state.requestsFinished + - this.stats.state.requestsFailed - } requests: ${this.stats.state.requestsFinished} succeeded, ${ - this.stats.state.requestsFailed - } failed.`, + this.stats.state.requestsFinished + this.stats.state.requestsFailed + } requests: ${this.stats.state.requestsFinished} succeeded, ${this.stats.state.requestsFailed} failed.`, { isStatusMessageTerminal: true, level: 'INFO' }, ); this.running = false; @@ -1069,14 +988,9 @@ export class BasicCrawler< return this.requestQueue!; } - async useState( - defaultValue = {} as State, - ): Promise { + async useState(defaultValue = {} as State): Promise { const kvs = await KeyValueStore.open(null, { config: this.config }); - return kvs.getAutoSavedValue( - BasicCrawler.CRAWLEE_STATE_KEY, - defaultValue, - ); + return kvs.getAutoSavedValue(BasicCrawler.CRAWLEE_STATE_KEY, defaultValue); } /** @@ -1101,10 +1015,7 @@ export class BasicCrawler< /** * Pushes data to the specified {@apilink Dataset}, or the default crawler {@apilink Dataset} by calling {@apilink Dataset.pushData}. */ - async pushData( - data: Parameters[0], - datasetIdOrName?: string, - ): Promise { + async pushData(data: Parameters[0], datasetIdOrName?: string): Promise { const dataset = await this.getDataset(datasetIdOrName); return dataset.pushData(data); } @@ -1119,9 +1030,7 @@ export class BasicCrawler< /** * Retrieves data from the default crawler {@apilink Dataset} by calling {@apilink Dataset.getData}. */ - async getData( - ...args: Parameters - ): ReturnType { + async getData(...args: Parameters): ReturnType { const dataset = await this.getDataset(); return dataset.getData(...args); } @@ -1130,43 +1039,28 @@ export class BasicCrawler< * Retrieves all the data from the default crawler {@apilink Dataset} and exports them to the specified format. * Supported formats are currently 'json' and 'csv', and will be inferred from the `path` automatically. */ - async exportData( - path: string, - format?: 'json' | 'csv', - options?: DatasetExportOptions, - ): Promise { + async exportData(path: string, format?: 'json' | 'csv', options?: DatasetExportOptions): Promise { const supportedFormats = ['json', 'csv']; if (!format && path.match(/\.(json|csv)$/i)) { - format = path.toLowerCase().match(/\.(json|csv)$/)![1] as - | 'json' - | 'csv'; + format = path.toLowerCase().match(/\.(json|csv)$/)![1] as 'json' | 'csv'; } if (!format) { throw new Error( - `Failed to infer format from the path: '${path}'. Supported formats: ${supportedFormats.join( - ', ', - )}`, + `Failed to infer format from the path: '${path}'. Supported formats: ${supportedFormats.join(', ')}`, ); } if (!supportedFormats.includes(format)) { - throw new Error( - `Unsupported format: '${format}'. Use one of ${supportedFormats.join( - ', ', - )}`, - ); + throw new Error(`Unsupported format: '${format}'. Use one of ${supportedFormats.join(', ')}`); } const dataset = await this.getDataset(); const items = await dataset.export(options); if (format === 'csv') { - const value = stringify([ - Object.keys(items[0]), - ...items.map((item) => Object.values(item)), - ]); + const value = stringify([Object.keys(items[0]), ...items.map((item) => Object.values(item))]); await ensureDir(dirname(path)); await writeFile(path, value); this.log.info(`Export to ${path} finished!`); @@ -1190,16 +1084,10 @@ export class BasicCrawler< // Initialize AutoscaledPool before awaiting _loadHandledRequestCount(), // so that the caller can get a reference to it before awaiting the promise returned from run() // (otherwise there would be no way) - this.autoscaledPool = new AutoscaledPool( - this.autoscaledPoolOptions, - this.config, - ); + this.autoscaledPool = new AutoscaledPool(this.autoscaledPoolOptions, this.config); if (this.useSessionPool) { - this.sessionPool = await SessionPool.open( - this.sessionPoolOptions, - this.config, - ); + this.sessionPool = await SessionPool.open(this.sessionPoolOptions, this.config); // Assuming there are not more than 20 browsers running at once; this.sessionPool.setMaxListeners(20); } @@ -1207,9 +1095,7 @@ export class BasicCrawler< await this._loadHandledRequestCount(); } - protected async _runRequestHandler( - crawlingContext: Context, - ): Promise { + protected async _runRequestHandler(crawlingContext: Context): Promise { await this.requestHandler(crawlingContext as LoadedContext); } @@ -1220,27 +1106,23 @@ export class BasicCrawler< const isBlocked = session.retireOnBlockedStatusCodes(statusCode); if (isBlocked) { - throw new Error( - `Request blocked - received ${statusCode} status code.`, - ); + throw new Error(`Request blocked - received ${statusCode} status code.`); } } protected async _pauseOnMigration() { if (this.autoscaledPool) { // if run wasn't called, this is going to crash - await this.autoscaledPool - .pause(SAFE_MIGRATION_WAIT_MILLIS) - .catch((err) => { - if (err.message.includes('running tasks did not finish')) { - this.log.error( - 'The crawler was paused due to migration to another host, ' + - "but some requests did not finish in time. Those requests' results may be duplicated.", - ); - } else { - throw err; - } - }); + await this.autoscaledPool.pause(SAFE_MIGRATION_WAIT_MILLIS).catch((err) => { + if (err.message.includes('running tasks did not finish')) { + this.log.error( + 'The crawler was paused due to migration to another host, ' + + "but some requests did not finish in time. Those requests' results may be duplicated.", + ); + } else { + throw err; + } + }); } const requestListPersistPromise = (async () => { @@ -1264,10 +1146,7 @@ export class BasicCrawler< } })(); - await Promise.all([ - requestListPersistPromise, - this.stats.persistState(), - ]); + await Promise.all([requestListPersistPromise, this.stats.persistState()]); } /** @@ -1307,10 +1186,7 @@ export class BasicCrawler< * adding it back to the queue after the timeout passes. Returns `true` if the request * should be ignored and will be reclaimed to the queue once ready. */ - protected delayRequest( - request: Request, - source: IRequestList | RequestProvider, - ) { + protected delayRequest(request: Request, source: IRequestList | RequestProvider) { const domain = getDomain(request.url); if (!domain || !request) { @@ -1320,10 +1196,7 @@ export class BasicCrawler< const now = Date.now(); const lastAccessTime = this.domainAccessedTime.get(domain); - if ( - !lastAccessTime || - now - lastAccessTime >= this.sameDomainDelayMillis - ) { + if (!lastAccessTime || now - lastAccessTime >= this.sameDomainDelayMillis) { this.domainAccessedTime.set(domain, now); return false; } @@ -1338,9 +1211,7 @@ export class BasicCrawler< `Request ${request.url} (${request.id}) will be reclaimed after ${delay} milliseconds due to same domain delay`, ); setTimeout(async () => { - this.log.debug( - `Adding request ${request.url} (${request.id}) back to the queue`, - ); + this.log.debug(`Adding request ${request.url} (${request.id}) back to the queue`); if (source instanceof RequestQueueV1) { // eslint-disable-next-line dot-notation @@ -1360,10 +1231,7 @@ export class BasicCrawler< * then retries them in a case of an error, etc. */ protected async _runTaskFunction() { - const source = - this.requestQueue || - this.requestList || - (await this.getRequestQueue()); + const source = this.requestQueue || this.requestList || (await this.getRequestQueue()); let request: Request | null | undefined; let session: Session | undefined; @@ -1373,9 +1241,7 @@ export class BasicCrawler< request = await this._fetchNextRequest(); }, this.internalTimeoutMillis, - `Fetching next request timed out after ${ - this.internalTimeoutMillis / 1e3 - } seconds.`, + `Fetching next request timed out after ${this.internalTimeoutMillis / 1e3} seconds.`, ); tryCancel(); @@ -1386,9 +1252,7 @@ export class BasicCrawler< session = await this.sessionPool!.getSession(); }, this.internalTimeoutMillis, - `Fetching session timed out after ${ - this.internalTimeoutMillis / 1e3 - } seconds.`, + `Fetching session timed out after ${this.internalTimeoutMillis / 1e3} seconds.`, ); } @@ -1414,9 +1278,7 @@ export class BasicCrawler< log: this.log, request, session, - enqueueLinks: async ( - options: SetRequired, - ) => { + enqueueLinks: async (options: SetRequired) => { return enqueueLinks({ // specify the RQ first to allow overriding it requestQueue: await this.getRequestQueue(), @@ -1429,10 +1291,8 @@ export class BasicCrawler< sendRequest: async (overrideOptions?: OptionsInit) => { const cookieJar = session ? { - getCookieString: async (url: string) => - session!.getCookieString(url), - setCookie: async (rawCookie: string, url: string) => - session!.setCookie(rawCookie, url), + getCookieString: async (url: string) => session!.getCookieString(url), + setCookie: async (rawCookie: string, url: string) => session!.setCookie(rawCookie, url), ...overrideOptions?.cookieJar, } : overrideOptions?.cookieJar; @@ -1453,8 +1313,7 @@ export class BasicCrawler< cookieJar, }); }, - getKeyValueStore: async (idOrName?: string) => - KeyValueStore.open(idOrName, { config: this.config }), + getKeyValueStore: async (idOrName?: string) => KeyValueStore.open(idOrName, { config: this.config }), }; this.crawlingContexts.set(crawlingContext.id, crawlingContext); @@ -1464,17 +1323,13 @@ export class BasicCrawler< await addTimeoutToPromise( async () => this._runRequestHandler(crawlingContext), this.requestHandlerTimeoutMillis, - `requestHandler timed out after ${ - this.requestHandlerTimeoutMillis / 1000 - } seconds (${request.id}).`, + `requestHandler timed out after ${this.requestHandlerTimeoutMillis / 1000} seconds (${request.id}).`, ); await this._timeoutAndRetry( async () => source.markRequestHandled(request!), this.internalTimeoutMillis, - `Marking request ${request.url} (${ - request.id - }) as handled timed out after ${ + `Marking request ${request.url} (${request.id}) as handled timed out after ${ this.internalTimeoutMillis / 1e3 } seconds.`, ); @@ -1489,16 +1344,9 @@ export class BasicCrawler< try { request.state = RequestState.ERROR_HANDLER; await addTimeoutToPromise( - async () => - this._requestFunctionErrorHandler( - err as Error, - crawlingContext, - source, - ), + async () => this._requestFunctionErrorHandler(err as Error, crawlingContext, source), this.internalTimeoutMillis, - `Handling request failure of ${request.url} (${ - request.id - }) timed out after ${ + `Handling request failure of ${request.url} (${request.id}) timed out after ${ this.internalTimeoutMillis / 1e3 } seconds.`, ); @@ -1555,18 +1403,8 @@ export class BasicCrawler< } catch (e) { if (retried <= maxRetries) { // we retry on any error, not just timeout - this.log.warning( - `${ - (e as Error).message - } (retrying ${retried}/${maxRetries})`, - ); - return this._timeoutAndRetry( - handler, - timeout, - error, - maxRetries, - retried + 1, - ); + this.log.warning(`${(e as Error).message} (retrying ${retried}/${maxRetries})`); + return this._timeoutAndRetry(handler, timeout, error, maxRetries, retried + 1); } throw e; @@ -1578,9 +1416,7 @@ export class BasicCrawler< */ protected async _isTaskReadyFunction() { // First check RequestList, since it's only in memory. - const isRequestListEmpty = this.requestList - ? await this.requestList.isEmpty() - : true; + const isRequestListEmpty = this.requestList ? await this.requestList.isEmpty() : true; // If RequestList is not empty, task is ready, no reason to check RequestQueue. if (!isRequestListEmpty) return true; // If RequestQueue is not empty, task is ready, return true, otherwise false. @@ -1591,11 +1427,10 @@ export class BasicCrawler< * Returns true if both RequestList and RequestQueue have all requests finished. */ protected async _defaultIsFinishedFunction() { - const [isRequestListFinished, isRequestQueueFinished] = - await Promise.all([ - this.requestList ? this.requestList.isFinished() : true, - this.requestQueue ? this.requestQueue.isFinished() : true, - ]); + const [isRequestListFinished, isRequestQueueFinished] = await Promise.all([ + this.requestList ? this.requestList.isFinished() : true, + this.requestQueue ? this.requestQueue.isFinished() : true, + ]); // If both are finished, return true, otherwise return false. return isRequestListFinished && isRequestQueueFinished; } @@ -1628,13 +1463,7 @@ export class BasicCrawler< if (shouldRetryRequest) { await this.stats.errorTrackerRetry.addAsync(error, crawlingContext); await this._tagUserHandlerError(() => - this.errorHandler?.( - this._augmentContextWithDeprecatedError( - crawlingContext, - error, - ), - error, - ), + this.errorHandler?.(this._augmentContextWithDeprecatedError(crawlingContext, error), error), ); if (error instanceof SessionError) { @@ -1649,14 +1478,11 @@ export class BasicCrawler< // We don't want to see the stack trace in the logs by default, when we are going to retry the request. // Thus, we print the full stack trace only when CRAWLEE_VERBOSE_LOG environment variable is set to true. const message = this._getMessageFromError(error); - this.log.warning( - `Reclaiming failed request back to the list or queue. ${message}`, - { - id, - url, - retryCount, - }, - ); + this.log.warning(`Reclaiming failed request back to the list or queue. ${message}`, { + id, + url, + retryCount, + }); await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront, @@ -1696,28 +1522,16 @@ export class BasicCrawler< } } - protected async _handleFailedRequestHandler( - crawlingContext: Context, - error: Error, - ): Promise { + protected async _handleFailedRequestHandler(crawlingContext: Context, error: Error): Promise { // Always log the last error regardless if the user provided a failedRequestHandler const { id, url, method, uniqueKey } = crawlingContext.request; const message = this._getMessageFromError(error, true); - this.log.error( - `Request failed and reached maximum retries. ${message}`, - { id, url, method, uniqueKey }, - ); + this.log.error(`Request failed and reached maximum retries. ${message}`, { id, url, method, uniqueKey }); if (this.failedRequestHandler) { await this._tagUserHandlerError(() => - this.failedRequestHandler?.( - this._augmentContextWithDeprecatedError( - crawlingContext, - error, - ), - error, - ), + this.failedRequestHandler?.(this._augmentContextWithDeprecatedError(crawlingContext, error), error), ); } } @@ -1728,31 +1542,21 @@ export class BasicCrawler< * @returns The message to be logged */ protected _getMessageFromError(error: Error, forceStack = false) { - if ( - [TypeError, SyntaxError, ReferenceError].some( - (type) => error instanceof type, - ) - ) { + if ([TypeError, SyntaxError, ReferenceError].some((type) => error instanceof type)) { forceStack = true; } - const stackLines = error?.stack - ? error.stack.split('\n') - : new Error().stack!.split('\n').slice(2); + const stackLines = error?.stack ? error.stack.split('\n') : new Error().stack!.split('\n').slice(2); const baseDir = process.cwd(); - const userLine = stackLines.find( - (line) => line.includes(baseDir) && !line.includes('node_modules'), - ); + const userLine = stackLines.find((line) => line.includes(baseDir) && !line.includes('node_modules')); if (error instanceof TimeoutError) { - return process.env.CRAWLEE_VERBOSE_LOG - ? error.stack - : error.message || error; // stack in timeout errors does not really help + return process.env.CRAWLEE_VERBOSE_LOG ? error.stack : error.message || error; // stack in timeout errors does not really help } return process.env.CRAWLEE_VERBOSE_LOG || forceStack - ? error.stack ?? [error.message || error, ...stackLines].join('\n') + ? (error.stack ?? [error.message || error, ...stackLines].join('\n')) : [error.message || error, userLine].join('\n'); } @@ -1761,8 +1565,7 @@ export class BasicCrawler< if ( request.noRetry || error instanceof NonRetryableError || - (error instanceof SessionError && - this.maxSessionRotations <= (request.sessionRotationCount ?? 0)) + (error instanceof SessionError && this.maxSessionRotations <= (request.sessionRotationCount ?? 0)) ) { return false; } @@ -1777,10 +1580,7 @@ export class BasicCrawler< return request.retryCount < maxRequestRetries; } - protected _augmentContextWithDeprecatedError( - context: Context, - error: Error, - ) { + protected _augmentContextWithDeprecatedError(context: Context, error: Error) { Object.defineProperty(context, 'error', { get: () => { this.log.deprecated( @@ -1811,9 +1611,10 @@ export class BasicCrawler< } } - protected async _executeHooks< - HookLike extends (...args: any[]) => Awaitable, - >(hooks: HookLike[], ...args: Parameters) { + protected async _executeHooks Awaitable>( + hooks: HookLike[], + ...args: Parameters + ) { if (Array.isArray(hooks) && hooks.length) { for (const hook of hooks) { await hook(...args); @@ -1872,10 +1673,7 @@ export class BasicCrawler< // @ts-expect-error Assigning to possibly readonly properties this[propertyKey] = newProperty; } else if (!allowUndefined) { - throw new ArgumentError( - `"${newName}" must be provided in the crawler options`, - this.constructor, - ); + throw new ArgumentError(`"${newName}" must be provided in the crawler options`, this.constructor); } } @@ -1884,10 +1682,7 @@ export class BasicCrawler< this.log.warning( `Encountered mixed casing for the cookie headers for request ${request.url} (${request.id}). Their values will be merged.`, ); - return mergeCookies(request.url, [ - request.headers.cookie, - request.headers.Cookie, - ]); + return mergeCookies(request.url, [request.headers.cookie, request.headers.Cookie]); } return request.headers?.Cookie || request.headers?.cookie || ''; @@ -1897,9 +1692,7 @@ export class BasicCrawler< // Check if it's explicitly disabled if (this.experiments.requestLocking === false) { if (!this._experimentWarnings.requestLocking) { - this.log.info( - 'Using the old RequestQueue implementation without request locking.', - ); + this.log.info('Using the old RequestQueue implementation without request locking.'); this._experimentWarnings.requestLocking = true; } @@ -1938,10 +1731,7 @@ export class BasicCrawler< }); if (baseUrlHostname) { - const loadedBaseUrlHostname = getDomain( - loadedBaseUrl.hostname, - { mixedInputs: false }, - ); + const loadedBaseUrlHostname = getDomain(loadedBaseUrl.hostname, { mixedInputs: false }); return baseUrlHostname === loadedBaseUrlHostname; } @@ -1955,10 +1745,7 @@ export class BasicCrawler< } case EnqueueStrategy.All: default: { - return ( - baseUrl.protocol === 'http:' || - baseUrl.protocol === 'https:' - ); + return baseUrl.protocol === 'http:' || baseUrl.protocol === 'https:'; } } } From d4ae373e8e9910d31b2b11ddaa49afd0854a61f5 Mon Sep 17 00:00:00 2001 From: Imamuzzaki Abu Salam Date: Wed, 2 Oct 2024 14:51:17 +0000 Subject: [PATCH 6/6] refactor: Refactor Monitor class to improve code readability and maintainability --- .../src/internals/basic-crawler.ts | 16 ++- packages/core/src/index.ts | 1 + packages/core/src/monitor.ts | 133 ++++++++++++++---- 3 files changed, 119 insertions(+), 31 deletions(-) diff --git a/packages/basic-crawler/src/internals/basic-crawler.ts b/packages/basic-crawler/src/internals/basic-crawler.ts index 50f475ca4602..d024949bdea4 100644 --- a/packages/basic-crawler/src/internals/basic-crawler.ts +++ b/packages/basic-crawler/src/internals/basic-crawler.ts @@ -2,7 +2,7 @@ import { dirname } from 'node:path'; import type { Log } from '@apify/log'; import defaultLog, { LogLevel } from '@apify/log'; -import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout'; +import { TimeoutError, addTimeoutToPromise, tryCancel } from '@apify/timeout'; import { cryptoRandomObjectId } from '@apify/utilities'; import type { AddRequestsBatchedOptions, @@ -25,22 +25,19 @@ import type { Session, SessionPoolOptions, Source, - StatisticsOptions, StatisticState, + StatisticsOptions, } from '@crawlee/core'; import { AutoscaledPool, Configuration, CriticalError, Dataset, - enqueueLinks, EnqueueStrategy, EventType, KeyValueStore, - mergeCookies, Monitor, NonRetryableError, - purgeDefaultStorages, RequestProvider, RequestQueue, RequestQueueV1, @@ -50,10 +47,13 @@ import { SessionError, SessionPool, Statistics, + enqueueLinks, + mergeCookies, + purgeDefaultStorages, validators, } from '@crawlee/core'; import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types'; -import { gotScraping, ROTATE_PROXY_ERRORS } from '@crawlee/utils'; +import { ROTATE_PROXY_ERRORS, gotScraping } from '@crawlee/utils'; import { stringify } from 'csv-stringify/sync'; import { ensureDir, writeFile, writeJSON } from 'fs-extra'; // @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types, so its alllll gooooood @@ -550,6 +550,7 @@ export class BasicCrawler { - this.display(); + start(interval: number = 500) { + if (!this.monitorDisplay) { + this.monitorDisplay = new MonitorDisplay(); + } + + this.intervalId = setInterval(async () => { + await this.display(); }, interval); } @@ -27,31 +37,106 @@ export class Monitor { } } - private display() { + private async display() { const stats = this.statistics.calculate(); const now = new Date(); const startTime = this.statistics.state.crawlerStartedAt; const elapsedTime = now.getTime() - new Date(startTime!).getTime(); const cpuLoad = os.loadavg()[0]; const memLoad = (os.totalmem() - os.freemem()) / os.totalmem(); + const { requestsFinished } = this.statistics.state; + const assumedTotalCount = this.requestQueue?.assumedTotalCount ?? 0; + + if (!this.monitorDisplay) { + throw new Error('Start the monitor first'); + } + + this.monitorDisplay.log(`Start: ${startTime ? formatDateTime(new Date(startTime)) : undefined}`); + this.monitorDisplay.log(`Now: ${formatDateTime(now)} (running for ${elapsedTime / 1000}s)`); + this.monitorDisplay.log( + `Progress: ${requestsFinished} / ${assumedTotalCount} (${((requestsFinished / assumedTotalCount) * 100).toFixed(2)}%), failed: ${this.statistics.state.requestsFailed} (${((this.statistics.state.requestsFailed / assumedTotalCount) * 100).toFixed(2)}%)`, + ); + this.monitorDisplay.log( + `Remaining: ${this.estimateRemainingTime(stats)} seconds (${(stats.requestsFinishedPerMinute / 60).toFixed(2)} pages/seconds)`, + ); + this.monitorDisplay.log(`Sys. load: ${cpuLoad.toFixed(2)}% CPU / ${(memLoad * 100).toFixed(2)}% Memory`); + this.monitorDisplay.log( + `Concurrencies: Current ${this.autoscaledPool?.currentConcurrency}, Desired ${this.autoscaledPool?.desiredConcurrency}`, + ); + + // TODO: Add list of URLs that are currently being processed - this.log.info(` -Start: ${startTime} -Now: ${now} (running for ${elapsedTime / 1000}s) -Progress: ${this.statistics.state.requestsFinished} / ${stats.requestsTotal} (${ - (this.statistics.state.requestsFinished / stats.requestsTotal) * 100 - }%), failed: ${this.statistics.state.requestsFailed} (${ - (this.statistics.state.requestsFailed / stats.requestsTotal) * 100 - }%) -Remaining: ${this.estimateRemainingTime(stats)} (${stats.requestsFinishedPerMinute} req/min) -Sys. load: ${cpuLoad.toFixed(2)} / ${(memLoad * 100).toFixed(2)}% -Concurrencies: ${this.statistics.state.requestsRetries} -`); + this.monitorDisplay.resetCursor(); } private estimateRemainingTime(stats: ReturnType) { - const remainingRequests = stats.requestsTotal - this.statistics.state.requestsFinished; + const na = 'N/A'; + if (!this.requestQueue) { + return na; + } + + const remainingRequests = this.requestQueue.assumedTotalCount - this.statistics.state.requestsFinished; const avgDuration = stats.requestAvgFinishedDurationMillis; - return (remainingRequests * avgDuration) / 1000; + const remainingTime = (remainingRequests * avgDuration) / 1000; + const safeRemainingTime = Number.isFinite(remainingTime) ? remainingTime.toFixed(2) : na; + return safeRemainingTime; + } +} + +const CLEAR_LINE = '\x1B[K'; + +class MonitorDisplay { + private lastLinesCount: number = 0; + private linesCount: number = 0; + + public log(str: string): void { + // We create an empty line at the start so that any console.log calls + // from within the script are above our output. + if (this.linesCount === 0) { + // eslint-disable-next-line no-console + console.log(CLEAR_LINE); // erases the current line + this.linesCount += 1; + } + + // Strip lines that are too long + // const strToLog = str.substring(0, 78); + const strToLog = str; + // eslint-disable-next-line no-console + console.log(`${CLEAR_LINE}${strToLog}`); + this.linesCount += 1; + } + + public resetCursor(): void { + // move cursor up to draw over out output + process.stdout.write(`\x1B[${this.linesCount}A`); + this.lastLinesCount = this.linesCount; + this.linesCount = 0; + } + + public close(): void { + // move cursor down so that console output stays + process.stdout.write(`\x1B[${this.lastLinesCount}B`); + } +} + +function formatDateTime(datetime: Date | number): string { + const date = typeof datetime === 'number' ? new Date(datetime) : datetime; + + const dateStr = `${date.getFullYear()}-${padDate(date.getMonth() + 1, 2)}-${padDate(date.getDate(), 2)}`; + const timeStr = + `${padDate(date.getHours(), 2)}` + + `:${padDate(date.getMinutes(), 2)}` + + `:${padDate(date.getSeconds(), 2)}` + + `.${padDate(date.getMilliseconds(), 3)}`; + + return `${dateStr} ${timeStr}`; +} + +function padDate(value: number | string, num: number): string { + const str = value.toString(); + if (str.length >= num) { + return str; } + const zeroesToAdd = num - str.length; + return '0'.repeat(zeroesToAdd) + str; }