Skip to content

Commit

Permalink
feat: allow configuring request queue interval
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Jan 31, 2025
1 parent c46550a commit 3e27ae6
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 143 deletions.
9 changes: 9 additions & 0 deletions src/__tests__/__snapshots__/config-snapshot.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,15 @@ exports[`config snapshot for PostHogConfig 1`] = `
\\"undefined\\",
\\"{ cache?: RequestCache | undefined; next_options?: NextOptions | undefined; }\\"
],
\\"request_queue_config\\": [
\\"undefined\\",
{
\\"flush_interval_ms\\": [
\\"undefined\\",
\\"number\\"
]
}
],
\\"__add_tracing_headers\\": [
\\"undefined\\",
\\"false\\",
Expand Down
256 changes: 148 additions & 108 deletions src/__tests__/request-queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,131 +1,171 @@
import { RequestQueue } from '../request-queue'
import { QueuedRequestOptions } from '../types'
import { DEFAULT_FLUSH_INTERVAL_MS, RequestQueue } from '../request-queue'
import { QueuedRequestWithOptions } from '../types'
import { createPosthogInstance } from './helpers/posthog-instance'

const EPOCH = 1_600_000_000

describe('RequestQueue', () => {
let sendRequest: (options: QueuedRequestOptions) => void
let queue: RequestQueue

beforeEach(() => {
sendRequest = jest.fn()
queue = new RequestQueue(sendRequest)
jest.useFakeTimers()
jest.setSystemTime(EPOCH - 3000) // Running the timers will add 3 seconds
jest.spyOn(console, 'warn').mockImplementation(() => {})
})
describe('setting flush timeout', () => {
it('can override the flush timeout', () => {
const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 1000 })
expect(queue['flushTimeoutMs']).toEqual(1000)
})

it('handles poll after enqueueing requests', () => {
queue.enqueue({
data: { event: 'foo', timestamp: EPOCH - 3000 },
transport: 'XHR',
url: '/e',
it('defaults to 3000 when not configured', () => {
const queue = new RequestQueue(jest.fn(), {})
expect(queue['flushTimeoutMs']).toEqual(DEFAULT_FLUSH_INTERVAL_MS)
})
queue.enqueue({
data: { event: '$identify', timestamp: EPOCH - 2000 },
url: '/identify',

it('defaults to 3000 when no config', () => {
const queue = new RequestQueue(jest.fn())
expect(queue['flushTimeoutMs']).toEqual(DEFAULT_FLUSH_INTERVAL_MS)
})
queue.enqueue({
data: { event: 'bar', timestamp: EPOCH - 1000 },
url: '/e',

it('cannot set below 250', () => {
const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 249 })
expect(queue['flushTimeoutMs']).toEqual(250)
})
queue.enqueue({
data: { event: 'zeta', timestamp: EPOCH },
url: '/e',
batchKey: 'sessionRecording',

it('cannot set above 5000', () => {
const queue = new RequestQueue(jest.fn(), { flush_interval_ms: 5001 })
expect(queue['flushTimeoutMs']).toEqual(5000)
})

queue.enable()

expect(sendRequest).toHaveBeenCalledTimes(0)

jest.runOnlyPendingTimers()

expect(sendRequest).toHaveBeenCalledTimes(3)
expect(jest.mocked(sendRequest).mock.calls).toEqual([
[
{
url: '/e',
data: [
{ event: 'foo', offset: 3000 },
{ event: 'bar', offset: 1000 },
],
transport: 'XHR',
},
],
[
{
url: '/identify',
data: [{ event: '$identify', offset: 2000 }],
},
],
[
{
url: '/e',
data: [{ event: 'zeta', offset: 0 }],
batchKey: 'sessionRecording',
},
],
])
it('can be passed in from posthog config', async () => {
const posthog = await createPosthogInstance('token', { request_queue_config: { flush_interval_ms: 1000 } })
expect(posthog.config.request_queue_config.flush_interval_ms).toEqual(1000)
expect(posthog['_requestQueue']['flushTimeoutMs']).toEqual(1000)
})
})

it('handles unload', () => {
queue.enqueue({ url: '/s', data: { recording_payload: 'example' } })
queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 } })
queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } })
queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } })
queue.unload()

expect(sendRequest).toHaveBeenCalledTimes(3)
expect(sendRequest).toHaveBeenNthCalledWith(1, {
url: '/e',
data: [
{ event: 'foo', timestamp: 1_610_000_000 },
{ event: 'bar', timestamp: 1_630_000_000 },
],
transport: 'sendBeacon',
})
describe('with default config', () => {
let sendRequest: (options: QueuedRequestWithOptions) => void
let queue: RequestQueue

expect(sendRequest).toHaveBeenNthCalledWith(2, {
url: '/s',
data: [{ recording_payload: 'example' }],
transport: 'sendBeacon',
beforeEach(() => {
sendRequest = jest.fn()
queue = new RequestQueue(sendRequest, {})
jest.useFakeTimers()
jest.setSystemTime(EPOCH - 3000) // Running the timers will add 3 seconds
jest.spyOn(console, 'warn').mockImplementation(() => {})
})
expect(sendRequest).toHaveBeenNthCalledWith(3, {
url: '/identify',
data: [{ event: '$identify', timestamp: 1_620_000_000 }],
transport: 'sendBeacon',
})
})

it('handles unload with batchKeys', () => {
queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 }, transport: 'XHR' })
queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } })
queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } })
queue.enqueue({ url: '/e', data: { event: 'zeta', timestamp: 1_640_000_000 }, batchKey: 'sessionRecording' })
it('handles poll after enqueueing requests', () => {
queue.enqueue({
data: { event: 'foo', timestamp: EPOCH - 3000 },
transport: 'XHR',
url: '/e',
})
queue.enqueue({
data: { event: '$identify', timestamp: EPOCH - 2000 },
url: '/identify',
})
queue.enqueue({
data: { event: 'bar', timestamp: EPOCH - 1000 },
url: '/e',
})
queue.enqueue({
data: { event: 'zeta', timestamp: EPOCH },
url: '/e',
batchKey: 'sessionRecording',
})

queue.unload()
queue.enable()

expect(sendRequest).toHaveBeenCalledTimes(3)
expect(sendRequest).toHaveBeenCalledTimes(0)

expect(sendRequest).toHaveBeenNthCalledWith(1, {
data: [
{ event: 'foo', timestamp: 1610000000 },
{ event: 'bar', timestamp: 1630000000 },
],
transport: 'sendBeacon',
url: '/e',
jest.runOnlyPendingTimers()

expect(sendRequest).toHaveBeenCalledTimes(3)
expect(jest.mocked(sendRequest).mock.calls).toEqual([
[
{
url: '/e',
data: [
{ event: 'foo', offset: 3000 },
{ event: 'bar', offset: 1000 },
],
transport: 'XHR',
},
],
[
{
url: '/identify',
data: [{ event: '$identify', offset: 2000 }],
},
],
[
{
url: '/e',
data: [{ event: 'zeta', offset: 0 }],
batchKey: 'sessionRecording',
},
],
])
})
expect(sendRequest).toHaveBeenNthCalledWith(2, {
batchKey: 'sessionRecording',
data: [{ event: 'zeta', timestamp: 1640000000 }],
transport: 'sendBeacon',
url: '/e',

it('handles unload', () => {
queue.enqueue({ url: '/s', data: { recording_payload: 'example' } })
queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 } })
queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } })
queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } })
queue.unload()

expect(sendRequest).toHaveBeenCalledTimes(3)
expect(sendRequest).toHaveBeenNthCalledWith(1, {
url: '/e',
data: [
{ event: 'foo', timestamp: 1_610_000_000 },
{ event: 'bar', timestamp: 1_630_000_000 },
],
transport: 'sendBeacon',
})

expect(sendRequest).toHaveBeenNthCalledWith(2, {
url: '/s',
data: [{ recording_payload: 'example' }],
transport: 'sendBeacon',
})
expect(sendRequest).toHaveBeenNthCalledWith(3, {
url: '/identify',
data: [{ event: '$identify', timestamp: 1_620_000_000 }],
transport: 'sendBeacon',
})
})
expect(sendRequest).toHaveBeenNthCalledWith(3, {
data: [{ event: '$identify', timestamp: 1620000000 }],
transport: 'sendBeacon',
url: '/identify',

it('handles unload with batchKeys', () => {
queue.enqueue({ url: '/e', data: { event: 'foo', timestamp: 1_610_000_000 }, transport: 'XHR' })
queue.enqueue({ url: '/identify', data: { event: '$identify', timestamp: 1_620_000_000 } })
queue.enqueue({ url: '/e', data: { event: 'bar', timestamp: 1_630_000_000 } })
queue.enqueue({
url: '/e',
data: { event: 'zeta', timestamp: 1_640_000_000 },
batchKey: 'sessionRecording',
})

queue.unload()

expect(sendRequest).toHaveBeenCalledTimes(3)

expect(sendRequest).toHaveBeenNthCalledWith(1, {
data: [
{ event: 'foo', timestamp: 1610000000 },
{ event: 'bar', timestamp: 1630000000 },
],
transport: 'sendBeacon',
url: '/e',
})
expect(sendRequest).toHaveBeenNthCalledWith(2, {
batchKey: 'sessionRecording',
data: [{ event: 'zeta', timestamp: 1640000000 }],
transport: 'sendBeacon',
url: '/e',
})
expect(sendRequest).toHaveBeenNthCalledWith(3, {
data: [{ event: '$identify', timestamp: 1620000000 }],
transport: 'sendBeacon',
url: '/identify',
})
})
})
})
6 changes: 3 additions & 3 deletions src/__tests__/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/// <reference lib="dom" />

import { extendURLParams, request } from '../request'
import { Compression, RequestOptions } from '../types'
import { Compression, RequestWithOptions } from '../types'

jest.mock('../utils/globals', () => ({
...jest.requireActual('../utils/globals'),
Expand Down Expand Up @@ -51,8 +51,8 @@ describe('request', () => {
const now = 1700000000000

const mockCallback = jest.fn()
let createRequest: (overrides?: Partial<RequestOptions>) => RequestOptions
let transport: RequestOptions['transport']
let createRequest: (overrides?: Partial<RequestWithOptions>) => RequestWithOptions
let transport: RequestWithOptions['transport']

beforeEach(() => {
mockedXHR.open.mockClear()
Expand Down
18 changes: 11 additions & 7 deletions src/posthog-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { SessionRecording } from './extensions/replay/sessionrecording'
import { RemoteConfigLoader } from './remote-config'
import { Toolbar } from './extensions/toolbar'
import { localStore } from './storage'
import { RequestQueue } from './request-queue'
import { DEFAULT_FLUSH_INTERVAL_MS, RequestQueue } from './request-queue'
import { RetryQueue } from './retry-queue'
import { SessionIdManager } from './sessionid'
import { RequestRouter, RequestRouterRegion } from './utils/request-router'
Expand All @@ -38,7 +38,7 @@ import {
PostHogConfig,
Properties,
Property,
QueuedRequestOptions,
QueuedRequestWithOptions,
RemoteConfig,
RequestCallback,
SessionIdChangedCallback,
Expand Down Expand Up @@ -186,6 +186,7 @@ export const defaultConfig = (): PostHogConfig => ({
session_idle_timeout_seconds: 30 * 60, // 30 minutes
person_profiles: 'identified_only',
before_send: undefined,
request_queue_config: { flush_interval_ms: DEFAULT_FLUSH_INTERVAL_MS },

// Used for internal testing
_onCapture: __NOOP,
Expand Down Expand Up @@ -284,7 +285,7 @@ export class PostHog {
_initialPageviewCaptured: boolean
_triggered_notifs: any
compression?: Compression
__request_queue: QueuedRequestOptions[]
__request_queue: QueuedRequestWithOptions[]
analyticsDefaultEndpoint: string
version = Config.LIB_VERSION
_initialPersonProfilesConfig: 'always' | 'never' | 'identified_only' | null
Expand Down Expand Up @@ -440,7 +441,10 @@ export class PostHog {
const initialPersistenceProps = { ...this.persistence.props }
const initialSessionProps = { ...this.sessionPersistence.props }

this._requestQueue = new RequestQueue((req) => this._send_retriable_request(req))
this._requestQueue = new RequestQueue(
(req) => this._send_retriable_request(req),
this.config.request_queue_config
)
this._retryQueue = new RetryQueue(this)
this.__request_queue = []

Expand Down Expand Up @@ -668,7 +672,7 @@ export class PostHog {
this._retryQueue?.unload()
}

_send_request(options: QueuedRequestOptions): void {
_send_request(options: QueuedRequestWithOptions): void {
if (!this.__loaded) {
return
}
Expand Down Expand Up @@ -710,7 +714,7 @@ export class PostHog {
})
}

_send_retriable_request(options: QueuedRequestOptions): void {
_send_retriable_request(options: QueuedRequestWithOptions): void {
if (this._retryQueue) {
this._retryQueue.retriableRequest(options)
} else {
Expand Down Expand Up @@ -918,7 +922,7 @@ export class PostHog {

this._internalEventEmitter.emit('eventCaptured', data)

const requestOptions: QueuedRequestOptions = {
const requestOptions: QueuedRequestWithOptions = {
method: 'POST',
url: options?._url ?? this.requestRouter.endpointFor('api', this.analyticsDefaultEndpoint),
data,
Expand Down
Loading

0 comments on commit 3e27ae6

Please sign in to comment.