Skip to content

Commit

Permalink
No tracing on non-json events (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am authored Aug 14, 2024
1 parent fe0b45e commit c54e366
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
5 changes: 4 additions & 1 deletion packages/opentelemetry/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ export class Instrumentation extends InstrumentationBase {

const resolvedEvent = resolved as ResolvedEvent;
const metadata = resolvedEvent?.event?.metadata;

if (!resolvedEvent.event?.isJson) return;

const parentContext = Instrumentation.restoreContext(metadata!);

const { hostname, port } = Instrumentation.getServerAddress(uri);
Expand All @@ -250,7 +253,7 @@ export class Instrumentation extends InstrumentationBase {
return context.with(parentContext, () => {
const span = tracer.startSpan(spanName, {
attributes,
kind: SpanKind.CLIENT,
kind: SpanKind.CONSUMER,
});

try {
Expand Down
80 changes: 79 additions & 1 deletion packages/test/src/opentelemetry/instrumentation.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { createTestNode, Defer, delay, jsonTestEvents } from "@test-utils";
import {
createTestNode,
Defer,
delay,
jsonTestEvents,
binaryTestEvents,
} from "@test-utils";
import {
NodeTracerProvider,
InMemorySpanExporter,
Expand All @@ -22,6 +28,7 @@ instrumentation.disable();
import * as esdb from "@eventstore/db-client";
import {
AppendToStreamOptions,
binaryEvent,
ResolvedEvent,
streamNameFilter,
WrongExpectedVersionError,
Expand Down Expand Up @@ -256,6 +263,77 @@ describe("instrumentation", () => {
[EventStoreDBAttributes.DATABASE_OPERATION]: "appendToStream",
});
});

test("non json events are not instrumented in subscription", async () => {
const defer = new Defer();
const { EventStoreDBClient, jsonEvent, binaryEvent } = await import(
"@eventstore/db-client"
);

const STREAM = v4();

const client = new EventStoreDBClient(
{ endpoint: node.uri },
{ rootCertificate: node.certs.root },
{ username: "admin", password: "changeit" }
);

const handleError = jest.fn((error) => {
defer.reject(error);
});
const handleEvent = jest.fn((event: ResolvedEvent) => {
if (event.event?.streamId == STREAM) {
subscription.unsubscribe();
}
});
const handleEnd = jest.fn(defer.resolve);
const handleConfirmation = jest.fn();

const event1 = binaryEvent({
type: "SomeType",
data: Buffer.from("hello"),
});
const event2 = jsonEvent({
type: "SomeType",
data: {},
});

await client.appendToStream(STREAM, [event1, event2]);

const subscription = client
.subscribeToStream(STREAM, {
credentials: {
username: "admin",
password: "changeit",
},
})
.on("error", handleError)
.on("data", handleEvent)
.on("end", handleEnd)
.on("confirmation", handleConfirmation);

await delay(500);
await defer.promise;

const spans = memoryExporter.getFinishedSpans();

const childSpans = spans.filter(
(span) => span.name === EventStoreDBAttributes.STREAM_SUBSCIBE
);

expect(handleConfirmation).toHaveBeenCalledTimes(1);

expect(childSpans).toBeDefined();

expect(childSpans).toHaveLength(1);

expect(
childSpans[0].attributes[EventStoreDBAttributes.EVENT_STORE_EVENT_ID]
).toBe(event2.id);
expect(
childSpans[0].attributes[EventStoreDBAttributes.EVENT_STORE_EVENT_TYPE]
).toBe(event2.type);
});
});

describe("persistent subscriptions", () => {
Expand Down

0 comments on commit c54e366

Please sign in to comment.