Skip to content

Commit

Permalink
Refactor Queue and ScheduledHandlers
Browse files Browse the repository at this point in the history
  • Loading branch information
evanderkoogh committed Dec 2, 2024
1 parent 9402db8 commit 822c9b3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 130 deletions.
100 changes: 41 additions & 59 deletions src/instrumentation/queue.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import { trace, SpanOptions, SpanKind, Attributes, Exception, context as api_context } from '@opentelemetry/api'
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'
import { Initialiser, setConfig } from '../config.js'
import { exportSpans, proxyExecutionContext } from './common.js'
import { instrumentEnv } from './env.js'
import { trace, SpanKind, Attributes, Span } from '@opentelemetry/api'
import { unwrap, wrap } from '../wrap.js'
import { versionAttributes } from './version.js'
import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js'
import { ATTR_FAAS_TRIGGER, FAAS_TRIGGER_VALUE_PUBSUB } from '@opentelemetry/semantic-conventions/incubating'

type QueueHandler = ExportedHandlerQueueHandler<unknown, unknown>
export type QueueHandlerArgs = Parameters<QueueHandler>

const traceIdSymbol = Symbol('traceId')

class MessageStatusCount {
succeeded = 0
failed = 0
implicitly_acked = 0
implicitly_retried = 0
readonly total: number

constructor(total: number) {
this.total = total
}
Expand All @@ -24,6 +22,7 @@ class MessageStatusCount {
}

ackRemaining() {
this.implicitly_acked = this.total - this.succeeded - this.failed
this.succeeded = this.total - this.failed
}

Expand All @@ -32,6 +31,7 @@ class MessageStatusCount {
}

retryRemaining() {
this.implicitly_retried = this.total - this.succeeded - this.failed
this.failed = this.total - this.succeeded
}

Expand All @@ -41,6 +41,8 @@ class MessageStatusCount {
'queue.messages_success': this.succeeded,
'queue.messages_failed': this.failed,
'queue.batch_success': this.succeeded === this.total,
'queue.implicitly_acked': this.implicitly_acked,
'queue.implicitly_retried': this.implicitly_retried,
}
}
}
Expand Down Expand Up @@ -131,60 +133,40 @@ const proxyMessageBatch = (batch: MessageBatch, count: MessageStatusCount) => {
return wrap(batch, batchHandler)
}

export function executeQueueHandler(queueFn: QueueHandler, [batch, env, ctx]: QueueHandlerArgs): Promise<void> {
const count = new MessageStatusCount(batch.messages.length)
batch = proxyMessageBatch(batch, count)
const tracer = trace.getTracer('queueHandler')
const options: SpanOptions = {
attributes: {
[SemanticAttributes.FAAS_TRIGGER]: 'pubsub',
'queue.name': batch.queue,
},
kind: SpanKind.CONSUMER,
}
Object.assign(options.attributes!, versionAttributes(env))
const promise = tracer.startActiveSpan(`queueHandler ${batch.queue}`, options, async (span) => {
const traceId = span.spanContext().traceId
api_context.active().setValue(traceIdSymbol, traceId)
try {
const result = await queueFn(batch, env, ctx)
span.setAttribute('queue.implicitly_acked', count.total - count.succeeded - count.failed)
count.ackRemaining()
span.setAttributes(count.toAttributes())
span.end()
return result
} catch (error) {
span.recordException(error as Exception)
span.setAttribute('queue.implicitly_retried', count.total - count.succeeded - count.failed)
count.retryRemaining()
span.end()
throw error
export class QueueInstrumentation implements HandlerInstrumentation<MessageBatch, OrPromise<void>> {
private count?: MessageStatusCount

getInitialSpanInfo(batch: MessageBatch): InitialSpanInfo {
return {
name: `queueHandler ${batch.queue}`,
options: {
attributes: {
[ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_PUBSUB,
'queue.name': batch.queue,
},
kind: SpanKind.CONSUMER,
},
}
})
return promise
}
}

export function createQueueHandler(queueFn: QueueHandler, initialiser: Initialiser) {
const queueHandler: ProxyHandler<QueueHandler> = {
async apply(target, _thisArg, argArray: Parameters<QueueHandler>): Promise<void> {
const [batch, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, batch)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: QueueHandlerArgs = [batch, env, ctx]

return await api_context.with(context, executeQueueHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
instrumentTrigger(batch: MessageBatch): MessageBatch {
this.count = new MessageStatusCount(batch.messages.length)
return proxyMessageBatch(batch, this.count)
}

executionSucces(span: Span) {
if (this.count) {
this.count.ackRemaining()
span.setAttributes(this.count.toAttributes())
}
}

executionFailed(span: Span) {
if (this.count) {
this.count.retryRemaining()
span.setAttributes(this.count.toAttributes())
}
}
return wrap(queueFn, queueHandler)
}

function instrumentQueueSend(fn: Queue<unknown>['send'], name: string): Queue<unknown>['send'] {
Expand Down
88 changes: 21 additions & 67 deletions src/instrumentation/scheduled.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,24 @@
import { trace, SpanOptions, SpanKind, Exception, context as api_context, SpanStatusCode } from '@opentelemetry/api'
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'
import { Initialiser, setConfig } from '../config.js'
import { exportSpans, proxyExecutionContext } from './common.js'
import { instrumentEnv } from './env.js'
import { wrap } from '../wrap.js'
import { versionAttributes } from './version.js'
import { SpanKind } from '@opentelemetry/api'
import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js'
import {
ATTR_FAAS_CRON,
ATTR_FAAS_TIME,
ATTR_FAAS_TRIGGER,
FAAS_TRIGGER_VALUE_TIMER,
} from '@opentelemetry/semantic-conventions/incubating'

type ScheduledHandler = ExportedHandlerScheduledHandler<unknown>
export type ScheduledHandlerArgs = Parameters<ScheduledHandler>

const traceIdSymbol = Symbol('traceId')

let cold_start = true
export function executeScheduledHandler(
scheduledFn: ScheduledHandler,
[controller, env, ctx]: ScheduledHandlerArgs,
): Promise<void> {
const tracer = trace.getTracer('scheduledHandler')
const attributes = {
[SemanticAttributes.FAAS_TRIGGER]: 'timer',
[SemanticAttributes.FAAS_COLDSTART]: cold_start,
[SemanticAttributes.FAAS_CRON]: controller.cron,
[SemanticAttributes.FAAS_TIME]: new Date(controller.scheduledTime).toISOString(),
}
cold_start = false
Object.assign(attributes, versionAttributes(env))
const options: SpanOptions = {
attributes,
kind: SpanKind.SERVER,
}

const promise = tracer.startActiveSpan(`scheduledHandler ${controller.cron}`, options, async (span) => {
const traceId = span.spanContext().traceId
api_context.active().setValue(traceIdSymbol, traceId)
try {
await scheduledFn(controller, env, ctx)
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw error
} finally {
span.end()
export const scheduledInstrumentation: HandlerInstrumentation<ScheduledController, OrPromise<void>> = {
getInitialSpanInfo: function (controller: ScheduledController): InitialSpanInfo {
return {
name: `scheduledHandler ${controller.cron}`,
options: {
attributes: {
[ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_TIMER,
[ATTR_FAAS_CRON]: controller.cron,
[ATTR_FAAS_TIME]: new Date(controller.scheduledTime).toISOString(),
},
kind: SpanKind.INTERNAL,
},
}
})
return promise
}

export function createScheduledHandler(scheduledFn: ScheduledHandler, initialiser: Initialiser) {
const scheduledHandler: ProxyHandler<ScheduledHandler> = {
async apply(target, _thisArg, argArray: Parameters<ScheduledHandler>): Promise<void> {
const [controller, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, controller)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: ScheduledHandlerArgs = [controller, env, ctx]

return await api_context.with(context, executeScheduledHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
}
return wrap(scheduledFn, scheduledHandler)
},
}
8 changes: 4 additions & 4 deletions src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import { Trigger, TraceConfig, ResolvedTraceConfig, OrPromise, HandlerInstrument
import { unwrap } from './wrap.js'
import { fetchInstrumentation, instrumentGlobalFetch } from './instrumentation/fetch.js'
import { instrumentGlobalCache } from './instrumentation/cache.js'
import { createQueueHandler } from './instrumentation/queue.js'
import { QueueInstrumentation } from './instrumentation/queue.js'
import { DOClass, instrumentDOClass } from './instrumentation/do.js'
import { createScheduledHandler } from './instrumentation/scheduled.js'
import { scheduledInstrumentation } from './instrumentation/scheduled.js'
//@ts-ignore
import * as versions from '../versions.json'
import { instrumentEnv } from './instrumentation/env.js'
Expand Down Expand Up @@ -190,12 +190,12 @@ export function instrument<E extends Env, Q, C>(

if (handler.scheduled) {
const scheduler = unwrap(handler.scheduled) as ScheduledHandler
handler.scheduled = createScheduledHandler(scheduler, initialiser)
handler.scheduled = createHandlerProxy(handler, scheduler, initialiser, scheduledInstrumentation)
}

if (handler.queue) {
const queuer = unwrap(handler.queue) as QueueHandler
handler.queue = createQueueHandler(queuer, initialiser)
handler.queue = createHandlerProxy(handler, queuer, initialiser, new QueueInstrumentation())
}
return handler
}
Expand Down

0 comments on commit 822c9b3

Please sign in to comment.