diff --git a/src/__tests__/__snapshots__/config-snapshot.test.ts.snap b/src/__tests__/__snapshots__/config-snapshot.test.ts.snap index 67bf3f700..968f8c813 100644 --- a/src/__tests__/__snapshots__/config-snapshot.test.ts.snap +++ b/src/__tests__/__snapshots__/config-snapshot.test.ts.snap @@ -467,6 +467,15 @@ exports[`config snapshot for PostHogConfig 1`] = ` \\"undefined\\", \\"{ cache?: RequestCache | undefined; next_options?: NextOptions | undefined; }\\" ], + \\"request_queue_config\\": [ + \\"undefined\\", + { + \\"flush_interval_ms\\": [ + \\"undefined\\", + \\"number\\" + ] + } + ], \\"__add_tracing_headers\\": [ \\"undefined\\", \\"false\\", diff --git a/src/__tests__/request-queue.test.ts b/src/__tests__/request-queue.test.ts index 95b3cd818..0f060cd64 100644 --- a/src/__tests__/request-queue.test.ts +++ b/src/__tests__/request-queue.test.ts @@ -1,131 +1,171 @@ -import { RequestQueue } from '../request-queue' -import { QueuedRequestOptions } from '../types' +import { DEFAULT_FLUSH_INTERVAL_MS, RequestQueue } from '../request-queue' +import { QueuedRequestWithOptions } from '../types' +import { createPosthogInstance } from './helpers/posthog-instance' const EPOCH = 1_600_000_000 describe('RequestQueue', () => { - let sendRequest: (options: QueuedRequestOptions) => void - let queue: RequestQueue - - beforeEach(() => { - sendRequest = jest.fn() - queue = new RequestQueue(sendRequest) - jest.useFakeTimers() - jest.setSystemTime(EPOCH - 3000) // Running the timers will add 3 seconds - jest.spyOn(console, 'warn').mockImplementation(() => {}) - }) + describe('setting flush timeout', () => { + it('can override the flush timeout', () => { + const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 1000 }) + expect(queue['flushTimeoutMs']).toEqual(1000) + }) - it('handles poll after enqueueing requests', () => { - queue.enqueue({ - data: { event: 'foo', timestamp: EPOCH - 3000 }, - transport: 'XHR', - url: '/e', + it('defaults to 3000 when not configured', () => { + const queue = new RequestQueue(jest.fn(), {}) + expect(queue['flushTimeoutMs']).toEqual(DEFAULT_FLUSH_INTERVAL_MS) }) - queue.enqueue({ - data: { event: '$identify', timestamp: EPOCH - 2000 }, - url: '/identify', + + it('defaults to 3000 when no config', () => { + const queue = new RequestQueue(jest.fn()) + expect(queue['flushTimeoutMs']).toEqual(DEFAULT_FLUSH_INTERVAL_MS) }) - queue.enqueue({ - data: { event: 'bar', timestamp: EPOCH - 1000 }, - url: '/e', + + it('cannot set below 250', () => { + const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 249 }) + expect(queue['flushTimeoutMs']).toEqual(250) }) - queue.enqueue({ - data: { event: 'zeta', timestamp: EPOCH }, - url: '/e', - batchKey: 'sessionRecording', + + it('cannot set above 5000', () => { + const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 5001 }) + expect(queue['flushTimeoutMs']).toEqual(5000) }) - queue.enable() - - expect(sendRequest).toHaveBeenCalledTimes(0) - - jest.runOnlyPendingTimers() - - expect(sendRequest).toHaveBeenCalledTimes(3) - expect(jest.mocked(sendRequest).mock.calls).toEqual([ - [ - { - url: '/e', - data: [ - { event: 'foo', offset: 3000 }, - { event: 'bar', offset: 1000 }, - ], - transport: 'XHR', - }, - ], - [ - { - url: '/identify', - data: [{ event: '$identify', offset: 2000 }], - }, - ], - [ - { - url: '/e', - data: [{ event: 'zeta', offset: 0 }], - batchKey: 'sessionRecording', - }, - ], - ]) + it('can be passed in from posthog config', async () => { + const posthog = await createPosthogInstance('token', { request_queue_config: { flush_interval_ms: 1000 } }) + expect(posthog.config.request_queue_config.flush_interval_ms).toEqual(1000) + expect(posthog['_requestQueue']['flushTimeoutMs']).toEqual(1000) + }) }) - it('handles unload', () => { - queue.enqueue({ url: '/s', data: { recording_payload: 'example' } }) - queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 } }) - queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } }) - queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } }) - queue.unload() - - expect(sendRequest).toHaveBeenCalledTimes(3) - expect(sendRequest).toHaveBeenNthCalledWith(1, { - url: '/e', - data: [ - { event: 'foo', timestamp: 1_610_000_000 }, - { event: 'bar', timestamp: 1_630_000_000 }, - ], - transport: 'sendBeacon', - }) + describe('with default config', () => { + let sendRequest: (options: QueuedRequestWithOptions) => void + let queue: RequestQueue - expect(sendRequest).toHaveBeenNthCalledWith(2, { - url: '/s', - data: [{ recording_payload: 'example' }], - transport: 'sendBeacon', + beforeEach(() => { + sendRequest = jest.fn() + queue = new RequestQueue(sendRequest, {}) + jest.useFakeTimers() + jest.setSystemTime(EPOCH - 3000) // Running the timers will add 3 seconds + jest.spyOn(console, 'warn').mockImplementation(() => {}) }) - expect(sendRequest).toHaveBeenNthCalledWith(3, { - url: '/identify', - data: [{ event: '$identify', timestamp: 1_620_000_000 }], - transport: 'sendBeacon', - }) - }) - it('handles unload with batchKeys', () => { - queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 }, transport: 'XHR' }) - queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } }) - queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } }) - queue.enqueue({ url: '/e', data: { event: 'zeta', timestamp: 1_640_000_000 }, batchKey: 'sessionRecording' }) + it('handles poll after enqueueing requests', () => { + queue.enqueue({ + data: { event: 'foo', timestamp: EPOCH - 3000 }, + transport: 'XHR', + url: '/e', + }) + queue.enqueue({ + data: { event: '$identify', timestamp: EPOCH - 2000 }, + url: '/identify', + }) + queue.enqueue({ + data: { event: 'bar', timestamp: EPOCH - 1000 }, + url: '/e', + }) + queue.enqueue({ + data: { event: 'zeta', timestamp: EPOCH }, + url: '/e', + batchKey: 'sessionRecording', + }) - queue.unload() + queue.enable() - expect(sendRequest).toHaveBeenCalledTimes(3) + expect(sendRequest).toHaveBeenCalledTimes(0) - expect(sendRequest).toHaveBeenNthCalledWith(1, { - data: [ - { event: 'foo', timestamp: 1610000000 }, - { event: 'bar', timestamp: 1630000000 }, - ], - transport: 'sendBeacon', - url: '/e', + jest.runOnlyPendingTimers() + + expect(sendRequest).toHaveBeenCalledTimes(3) + expect(jest.mocked(sendRequest).mock.calls).toEqual([ + [ + { + url: '/e', + data: [ + { event: 'foo', offset: 3000 }, + { event: 'bar', offset: 1000 }, + ], + transport: 'XHR', + }, + ], + [ + { + url: '/identify', + data: [{ event: '$identify', offset: 2000 }], + }, + ], + [ + { + url: '/e', + data: [{ event: 'zeta', offset: 0 }], + batchKey: 'sessionRecording', + }, + ], + ]) }) - expect(sendRequest).toHaveBeenNthCalledWith(2, { - batchKey: 'sessionRecording', - data: [{ event: 'zeta', timestamp: 1640000000 }], - transport: 'sendBeacon', - url: '/e', + + it('handles unload', () => { + queue.enqueue({ url: '/s', data: { recording_payload: 'example' } }) + queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 } }) + queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } }) + queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } }) + queue.unload() + + expect(sendRequest).toHaveBeenCalledTimes(3) + expect(sendRequest).toHaveBeenNthCalledWith(1, { + url: '/e', + data: [ + { event: 'foo', timestamp: 1_610_000_000 }, + { event: 'bar', timestamp: 1_630_000_000 }, + ], + transport: 'sendBeacon', + }) + + expect(sendRequest).toHaveBeenNthCalledWith(2, { + url: '/s', + data: [{ recording_payload: 'example' }], + transport: 'sendBeacon', + }) + expect(sendRequest).toHaveBeenNthCalledWith(3, { + url: '/identify', + data: [{ event: '$identify', timestamp: 1_620_000_000 }], + transport: 'sendBeacon', + }) }) - expect(sendRequest).toHaveBeenNthCalledWith(3, { - data: [{ event: '$identify', timestamp: 1620000000 }], - transport: 'sendBeacon', - url: '/identify', + + it('handles unload with batchKeys', () => { + queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 }, transport: 'XHR' }) + queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } }) + queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } }) + queue.enqueue({ + url: '/e', + data: { event: 'zeta', timestamp: 1_640_000_000 }, + batchKey: 'sessionRecording', + }) + + queue.unload() + + expect(sendRequest).toHaveBeenCalledTimes(3) + + expect(sendRequest).toHaveBeenNthCalledWith(1, { + data: [ + { event: 'foo', timestamp: 1610000000 }, + { event: 'bar', timestamp: 1630000000 }, + ], + transport: 'sendBeacon', + url: '/e', + }) + expect(sendRequest).toHaveBeenNthCalledWith(2, { + batchKey: 'sessionRecording', + data: [{ event: 'zeta', timestamp: 1640000000 }], + transport: 'sendBeacon', + url: '/e', + }) + expect(sendRequest).toHaveBeenNthCalledWith(3, { + data: [{ event: '$identify', timestamp: 1620000000 }], + transport: 'sendBeacon', + url: '/identify', + }) }) }) }) diff --git a/src/__tests__/request.test.ts b/src/__tests__/request.test.ts index b098a1d5f..046124970 100644 --- a/src/__tests__/request.test.ts +++ b/src/__tests__/request.test.ts @@ -2,7 +2,7 @@ /// import { extendURLParams, request } from '../request' -import { Compression, RequestOptions } from '../types' +import { Compression, RequestWithOptions } from '../types' jest.mock('../utils/globals', () => ({ ...jest.requireActual('../utils/globals'), @@ -51,8 +51,8 @@ describe('request', () => { const now = 1700000000000 const mockCallback = jest.fn() - let createRequest: (overrides?: Partial) => RequestOptions - let transport: RequestOptions['transport'] + let createRequest: (overrides?: Partial) => RequestWithOptions + let transport: RequestWithOptions['transport'] beforeEach(() => { mockedXHR.open.mockClear() diff --git a/src/posthog-core.ts b/src/posthog-core.ts index f3b160601..cce7495ae 100644 --- a/src/posthog-core.ts +++ b/src/posthog-core.ts @@ -24,7 +24,7 @@ import { SessionRecording } from './extensions/replay/sessionrecording' import { RemoteConfigLoader } from './remote-config' import { Toolbar } from './extensions/toolbar' import { localStore } from './storage' -import { RequestQueue } from './request-queue' +import { DEFAULT_FLUSH_INTERVAL_MS, RequestQueue } from './request-queue' import { RetryQueue } from './retry-queue' import { SessionIdManager } from './sessionid' import { RequestRouter, RequestRouterRegion } from './utils/request-router' @@ -38,7 +38,7 @@ import { PostHogConfig, Properties, Property, - QueuedRequestOptions, + QueuedRequestWithOptions, RemoteConfig, RequestCallback, SessionIdChangedCallback, @@ -186,6 +186,7 @@ export const defaultConfig = (): PostHogConfig => ({ session_idle_timeout_seconds: 30 * 60, // 30 minutes person_profiles: 'identified_only', before_send: undefined, + request_queue_config: { flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS }, // Used for internal testing _onCapture: __NOOP, @@ -284,7 +285,7 @@ export class PostHog { _initialPageviewCaptured: boolean _triggered_notifs: any compression?: Compression - __request_queue: QueuedRequestOptions[] + __request_queue: QueuedRequestWithOptions[] analyticsDefaultEndpoint: string version = Config.LIB_VERSION _initialPersonProfilesConfig: 'always' | 'never' | 'identified_only' | null @@ -440,7 +441,10 @@ export class PostHog { const initialPersistenceProps = { ...this.persistence.props } const initialSessionProps = { ...this.sessionPersistence.props } - this._requestQueue = new RequestQueue((req) => this._send_retriable_request(req)) + this._requestQueue = new RequestQueue( + (req) => this._send_retriable_request(req), + this.config.request_queue_config + ) this._retryQueue = new RetryQueue(this) this.__request_queue = [] @@ -668,7 +672,7 @@ export class PostHog { this._retryQueue?.unload() } - _send_request(options: QueuedRequestOptions): void { + _send_request(options: QueuedRequestWithOptions): void { if (!this.__loaded) { return } @@ -710,7 +714,7 @@ export class PostHog { }) } - _send_retriable_request(options: QueuedRequestOptions): void { + _send_retriable_request(options: QueuedRequestWithOptions): void { if (this._retryQueue) { this._retryQueue.retriableRequest(options) } else { @@ -918,7 +922,7 @@ export class PostHog { this._internalEventEmitter.emit('eventCaptured', data) - const requestOptions: QueuedRequestOptions = { + const requestOptions: QueuedRequestWithOptions = { method: 'POST', url: options?._url ?? this.requestRouter.endpointFor('api', this.analyticsDefaultEndpoint), data, diff --git a/src/request-queue.ts b/src/request-queue.ts index 497f1b506..1d1d2d750 100644 --- a/src/request-queue.ts +++ b/src/request-queue.ts @@ -1,21 +1,31 @@ -import { QueuedRequestOptions } from './types' +import { QueuedRequestWithOptions, RequestQueueConfig } from './types' import { each } from './utils' import { isArray, isUndefined } from './utils/type-utils' +import { clampToRange } from './utils/number-utils' + +export const DEFAULT_FLUSH_INTERVAL_MS = 3000 export class RequestQueue { // We start in a paused state and only start flushing when enabled by the parent private isPaused: boolean = true - private queue: QueuedRequestOptions[] = [] + private queue: QueuedRequestWithOptions[] = [] private flushTimeout?: ReturnType - private flushTimeoutMs = 3000 - private sendRequest: (req: QueuedRequestOptions) => void + private flushTimeoutMs: number + private sendRequest: (req: QueuedRequestWithOptions) => void - constructor(sendRequest: (req: QueuedRequestOptions) => void) { + constructor(sendRequest: (req: QueuedRequestWithOptions) => void, config?: RequestQueueConfig) { + this.flushTimeoutMs = clampToRange( + config?.flush_interval_ms || DEFAULT_FLUSH_INTERVAL_MS, + 250, + 5000, + 'flush interval', + DEFAULT_FLUSH_INTERVAL_MS + ) this.sendRequest = sendRequest } - enqueue(req: QueuedRequestOptions): void { + enqueue(req: QueuedRequestWithOptions): void { this.queue.push(req) if (!this.flushTimeout) { @@ -72,9 +82,9 @@ export class RequestQueue { this.flushTimeout = undefined } - private formatQueue(): Record { - const requests: Record = {} - each(this.queue, (request: QueuedRequestOptions) => { + private formatQueue(): Record { + const requests: Record = {} + each(this.queue, (request: QueuedRequestWithOptions) => { const req = request const key = (req ? req.batchKey : null) || req.url if (isUndefined(requests[key])) { diff --git a/src/request.ts b/src/request.ts index 3a1e52b19..f906d6bfb 100644 --- a/src/request.ts +++ b/src/request.ts @@ -1,6 +1,6 @@ import { each, find } from './utils' import Config from './config' -import { Compression, RequestOptions, RequestResponse } from './types' +import { Compression, RequestWithOptions, RequestResponse } from './types' import { formDataToQuery } from './utils/request-utils' import { logger } from './utils/logger' @@ -56,7 +56,7 @@ const encodeToDataString = (data: string | Record): string => { return 'data=' + encodeURIComponent(typeof data === 'string' ? data : jsonStringify(data)) } -const encodePostData = ({ data, compression }: RequestOptions): EncodedBody | undefined => { +const encodePostData = ({ data, compression }: RequestWithOptions): EncodedBody | undefined => { if (!data) { return } @@ -90,7 +90,7 @@ const encodePostData = ({ data, compression }: RequestOptions): EncodedBody | un } } -const xhr = (options: RequestOptions) => { +const xhr = (options: RequestWithOptions) => { const req = new XMLHttpRequest!() req.open(options.method || 'GET', options.url, true) const { contentType, body } = encodePostData(options) ?? {} @@ -130,7 +130,7 @@ const xhr = (options: RequestOptions) => { req.send(body) } -const _fetch = (options: RequestOptions) => { +const _fetch = (options: RequestWithOptions) => { const { contentType, body, estimatedSize } = encodePostData(options) ?? {} // eslint-disable-next-line compat/compat @@ -196,7 +196,7 @@ const _fetch = (options: RequestOptions) => { return } -const _sendBeacon = (options: RequestOptions) => { +const _sendBeacon = (options: RequestWithOptions) => { // beacon documentation https://w3c.github.io/beacon/ // beacons format the message and use the type property @@ -215,7 +215,10 @@ const _sendBeacon = (options: RequestOptions) => { } } -const AVAILABLE_TRANSPORTS: { transport: RequestOptions['transport']; method: (options: RequestOptions) => void }[] = [] +const AVAILABLE_TRANSPORTS: { + transport: RequestWithOptions['transport'] + method: (options: RequestWithOptions) => void +}[] = [] // We add the transports in order of preference if (fetch) { @@ -240,7 +243,7 @@ if (navigator?.sendBeacon) { } // This is the entrypoint. It takes care of sanitizing the options and then calls the appropriate request method. -export const request = (_options: RequestOptions) => { +export const request = (_options: RequestWithOptions) => { // Clone the options so we don't modify the original object const options = { ..._options } options.timeout = options.timeout || 60000 diff --git a/src/retry-queue.ts b/src/retry-queue.ts index b21ab2263..650b54edf 100644 --- a/src/retry-queue.ts +++ b/src/retry-queue.ts @@ -1,4 +1,4 @@ -import { RetriableRequestOptions } from './types' +import { RetriableRequestWithOptions } from './types' import { isNumber, isUndefined } from './utils/type-utils' import { logger } from './utils/logger' @@ -30,7 +30,7 @@ export function pickNextRetryDelay(retriesPerformedSoFar: number): number { interface RetryQueueElement { retryAt: number - requestOptions: RetriableRequestOptions + requestOptions: RetriableRequestWithOptions } export class RetryQueue { @@ -58,7 +58,7 @@ export class RetryQueue { } } - retriableRequest({ retriesPerformedSoFar, ...options }: RetriableRequestOptions): void { + retriableRequest({ retriesPerformedSoFar, ...options }: RetriableRequestWithOptions): void { if (isNumber(retriesPerformedSoFar) && retriesPerformedSoFar > 0) { options.url = extendURLParams(options.url, { retry_count: retriesPerformedSoFar }) } @@ -81,7 +81,7 @@ export class RetryQueue { }) } - private enqueue(requestOptions: RetriableRequestOptions): void { + private enqueue(requestOptions: RetriableRequestWithOptions): void { const retriesPerformedSoFar = requestOptions.retriesPerformedSoFar || 0 requestOptions.retriesPerformedSoFar = retriesPerformedSoFar + 1 diff --git a/src/types.ts b/src/types.ts index 7e2c16ff8..4d38b86fb 100644 --- a/src/types.ts +++ b/src/types.ts @@ -841,6 +841,12 @@ export interface PostHogConfig { next_options?: NextOptions } + /** + * Used to change the behavior of the request queue. + * This is an advanced feature and should be used with caution. + */ + request_queue_config?: RequestQueueConfig + // ------- PREVIEW CONFIGS ------- /** @@ -1057,7 +1063,7 @@ export type RequestCallback = (response: RequestResponse) => void // See https://nextjs.org/docs/app/api-reference/functions/fetch#fetchurl-options type NextOptions = { revalidate: false | 0 | number; tags: string[] } -export interface RequestOptions { +export interface RequestWithOptions { url: string // Data can be a single object or an array of objects when batched data?: Record | Record[] @@ -1077,15 +1083,29 @@ export interface RequestOptions { // Queued request types - the same as a request but with additional queueing information -export interface QueuedRequestOptions extends RequestOptions { - batchKey?: string /** key of queue, e.g. 'sessionRecording' vs 'event' */ +export interface QueuedRequestWithOptions extends RequestWithOptions { + /** key of queue, e.g. 'sessionRecording' vs 'event' */ + batchKey?: string } // Used explicitly for retriable requests -export interface RetriableRequestOptions extends QueuedRequestOptions { +export interface RetriableRequestWithOptions extends QueuedRequestWithOptions { retriesPerformedSoFar?: number } +// we used to call a request that was sent to the queue with options attached `RequestQueueOptions` +// so we can't call the options used to configure the behavior of the RequestQueue that as well, +// so instead we call them config +export interface RequestQueueConfig { + /** + * ADVANCED - alters the frequency which PostHog sends events to the server. + * generally speaking this is only set when apps have automatic page refreshes, or very short visits. + * Defaults to 3 seconds when not set + * Allowed values between 250 and 5000 + * */ + flush_interval_ms?: number +} + export interface CaptureOptions { /** * Used when `$identify` is called @@ -1127,7 +1147,7 @@ export interface CaptureOptions { /** * If set, overrides the desired transport method */ - transport?: RequestOptions['transport'] + transport?: RequestWithOptions['transport'] /** * If set, overrides the current timestamp