diff --git a/src/instrumentation/queue.ts b/src/instrumentation/queue.ts index 585e46c..28f09f6 100644 --- a/src/instrumentation/queue.ts +++ b/src/instrumentation/queue.ts @@ -1,20 +1,18 @@ -import { trace, SpanOptions, SpanKind, Attributes, Exception, context as api_context } from '@opentelemetry/api' -import { SemanticAttributes } from '@opentelemetry/semantic-conventions' -import { Initialiser, setConfig } from '../config.js' -import { exportSpans, proxyExecutionContext } from './common.js' -import { instrumentEnv } from './env.js' +import { trace, SpanKind, Attributes, Span } from '@opentelemetry/api' import { unwrap, wrap } from '../wrap.js' -import { versionAttributes } from './version.js' +import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js' +import { ATTR_FAAS_TRIGGER, FAAS_TRIGGER_VALUE_PUBSUB } from '@opentelemetry/semantic-conventions/incubating' type QueueHandler = ExportedHandlerQueueHandler export type QueueHandlerArgs = Parameters -const traceIdSymbol = Symbol('traceId') - class MessageStatusCount { succeeded = 0 failed = 0 + implicitly_acked = 0 + implicitly_retried = 0 readonly total: number + constructor(total: number) { this.total = total } @@ -24,6 +22,7 @@ class MessageStatusCount { } ackRemaining() { + this.implicitly_acked = this.total - this.succeeded - this.failed this.succeeded = this.total - this.failed } @@ -32,6 +31,7 @@ class MessageStatusCount { } retryRemaining() { + this.implicitly_retried = this.total - this.succeeded - this.failed this.failed = this.total - this.succeeded } @@ -41,6 +41,8 @@ class MessageStatusCount { 'queue.messages_success': this.succeeded, 'queue.messages_failed': this.failed, 'queue.batch_success': this.succeeded === this.total, + 'queue.implicitly_acked': this.implicitly_acked, + 'queue.implicitly_retried': this.implicitly_retried, } } } @@ -131,60 +133,40 @@ const proxyMessageBatch = (batch: MessageBatch, count: MessageStatusCount) => { return wrap(batch, batchHandler) } -export function executeQueueHandler(queueFn: QueueHandler, [batch, env, ctx]: QueueHandlerArgs): Promise { - const count = new MessageStatusCount(batch.messages.length) - batch = proxyMessageBatch(batch, count) - const tracer = trace.getTracer('queueHandler') - const options: SpanOptions = { - attributes: { - [SemanticAttributes.FAAS_TRIGGER]: 'pubsub', - 'queue.name': batch.queue, - }, - kind: SpanKind.CONSUMER, - } - Object.assign(options.attributes!, versionAttributes(env)) - const promise = tracer.startActiveSpan(`queueHandler ${batch.queue}`, options, async (span) => { - const traceId = span.spanContext().traceId - api_context.active().setValue(traceIdSymbol, traceId) - try { - const result = await queueFn(batch, env, ctx) - span.setAttribute('queue.implicitly_acked', count.total - count.succeeded - count.failed) - count.ackRemaining() - span.setAttributes(count.toAttributes()) - span.end() - return result - } catch (error) { - span.recordException(error as Exception) - span.setAttribute('queue.implicitly_retried', count.total - count.succeeded - count.failed) - count.retryRemaining() - span.end() - throw error +export class QueueInstrumentation implements HandlerInstrumentation> { + private count?: MessageStatusCount + + getInitialSpanInfo(batch: MessageBatch): InitialSpanInfo { + return { + name: `queueHandler ${batch.queue}`, + options: { + attributes: { + [ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_PUBSUB, + 'queue.name': batch.queue, + }, + kind: SpanKind.CONSUMER, + }, } - }) - return promise -} + } -export function createQueueHandler(queueFn: QueueHandler, initialiser: Initialiser) { - const queueHandler: ProxyHandler = { - async apply(target, _thisArg, argArray: Parameters): Promise { - const [batch, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, batch) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: QueueHandlerArgs = [batch, env, ctx] - - return await api_context.with(context, executeQueueHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, + instrumentTrigger(batch: MessageBatch): MessageBatch { + this.count = new MessageStatusCount(batch.messages.length) + return proxyMessageBatch(batch, this.count) + } + + executionSucces(span: Span) { + if (this.count) { + this.count.ackRemaining() + span.setAttributes(this.count.toAttributes()) + } + } + + executionFailed(span: Span) { + if (this.count) { + this.count.retryRemaining() + span.setAttributes(this.count.toAttributes()) + } } - return wrap(queueFn, queueHandler) } function instrumentQueueSend(fn: Queue['send'], name: string): Queue['send'] { diff --git a/src/instrumentation/scheduled.ts b/src/instrumentation/scheduled.ts index 0dcbe37..8fbd0ae 100644 --- a/src/instrumentation/scheduled.ts +++ b/src/instrumentation/scheduled.ts @@ -1,70 +1,24 @@ -import { trace, SpanOptions, SpanKind, Exception, context as api_context, SpanStatusCode } from '@opentelemetry/api' -import { SemanticAttributes } from '@opentelemetry/semantic-conventions' -import { Initialiser, setConfig } from '../config.js' -import { exportSpans, proxyExecutionContext } from './common.js' -import { instrumentEnv } from './env.js' -import { wrap } from '../wrap.js' -import { versionAttributes } from './version.js' +import { SpanKind } from '@opentelemetry/api' +import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js' +import { + ATTR_FAAS_CRON, + ATTR_FAAS_TIME, + ATTR_FAAS_TRIGGER, + FAAS_TRIGGER_VALUE_TIMER, +} from '@opentelemetry/semantic-conventions/incubating' -type ScheduledHandler = ExportedHandlerScheduledHandler -export type ScheduledHandlerArgs = Parameters - -const traceIdSymbol = Symbol('traceId') - -let cold_start = true -export function executeScheduledHandler( - scheduledFn: ScheduledHandler, - [controller, env, ctx]: ScheduledHandlerArgs, -): Promise { - const tracer = trace.getTracer('scheduledHandler') - const attributes = { - [SemanticAttributes.FAAS_TRIGGER]: 'timer', - [SemanticAttributes.FAAS_COLDSTART]: cold_start, - [SemanticAttributes.FAAS_CRON]: controller.cron, - [SemanticAttributes.FAAS_TIME]: new Date(controller.scheduledTime).toISOString(), - } - cold_start = false - Object.assign(attributes, versionAttributes(env)) - const options: SpanOptions = { - attributes, - kind: SpanKind.SERVER, - } - - const promise = tracer.startActiveSpan(`scheduledHandler ${controller.cron}`, options, async (span) => { - const traceId = span.spanContext().traceId - api_context.active().setValue(traceIdSymbol, traceId) - try { - await scheduledFn(controller, env, ctx) - } catch (error) { - span.recordException(error as Exception) - span.setStatus({ code: SpanStatusCode.ERROR }) - throw error - } finally { - span.end() +export const scheduledInstrumentation: HandlerInstrumentation> = { + getInitialSpanInfo: function (controller: ScheduledController): InitialSpanInfo { + return { + name: `scheduledHandler ${controller.cron}`, + options: { + attributes: { + [ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_TIMER, + [ATTR_FAAS_CRON]: controller.cron, + [ATTR_FAAS_TIME]: new Date(controller.scheduledTime).toISOString(), + }, + kind: SpanKind.INTERNAL, + }, } - }) - return promise -} - -export function createScheduledHandler(scheduledFn: ScheduledHandler, initialiser: Initialiser) { - const scheduledHandler: ProxyHandler = { - async apply(target, _thisArg, argArray: Parameters): Promise { - const [controller, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, controller) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: ScheduledHandlerArgs = [controller, env, ctx] - - return await api_context.with(context, executeScheduledHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, - } - return wrap(scheduledFn, scheduledHandler) + }, } diff --git a/src/sdk.ts b/src/sdk.ts index 5d9e795..a9ab050 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -7,9 +7,9 @@ import { Trigger, TraceConfig, ResolvedTraceConfig, OrPromise, HandlerInstrument import { unwrap } from './wrap.js' import { fetchInstrumentation, instrumentGlobalFetch } from './instrumentation/fetch.js' import { instrumentGlobalCache } from './instrumentation/cache.js' -import { createQueueHandler } from './instrumentation/queue.js' +import { QueueInstrumentation } from './instrumentation/queue.js' import { DOClass, instrumentDOClass } from './instrumentation/do.js' -import { createScheduledHandler } from './instrumentation/scheduled.js' +import { scheduledInstrumentation } from './instrumentation/scheduled.js' //@ts-ignore import * as versions from '../versions.json' import { instrumentEnv } from './instrumentation/env.js' @@ -190,12 +190,12 @@ export function instrument( if (handler.scheduled) { const scheduler = unwrap(handler.scheduled) as ScheduledHandler - handler.scheduled = createScheduledHandler(scheduler, initialiser) + handler.scheduled = createHandlerProxy(handler, scheduler, initialiser, scheduledInstrumentation) } if (handler.queue) { const queuer = unwrap(handler.queue) as QueueHandler - handler.queue = createQueueHandler(queuer, initialiser) + handler.queue = createHandlerProxy(handler, queuer, initialiser, new QueueInstrumentation()) } return handler }