From 0f40d4897fb1edcaf0fe7c85b6a3ce43fa17d75c Mon Sep 17 00:00:00 2001 From: Steven Vancoillie Date: Sat, 11 Jan 2025 09:28:37 +0100 Subject: [PATCH] feat(streams): capture MP4 stream Adds a `capture` method to the pipelines that produce MP4 data. It stores all data in a buffer at start of movie, and passes it to the provided callback when capture ends. The latter can be triggered by calling the returned trigger function, or automatically when the buffer is full. Co-authored-by: Rikard Tegnander Co-authored-by: Victor Ingvarsson --- .../camera/simple-mp4-player.js | 26 +++- example-streams-web/camera/simple-mp4.html | 39 +++--- example-streams-web/favicon.ico | Bin 0 -> 3362 bytes streams/src/components/index.ts | 1 - streams/src/components/mp4-capture.ts | 89 ------------- streams/src/components/mse-sink.ts | 9 +- streams/src/http-mp4-pipeline.ts | 88 +++++++++---- streams/src/mp4-capture.ts | 69 +++++++++++ streams/src/rtsp-mp4-pipeline.ts | 13 +- streams/tests/mp4-capture.test.ts | 117 +++++++----------- 10 files changed, 244 insertions(+), 207 deletions(-) create mode 100644 example-streams-web/favicon.ico delete mode 100644 streams/src/components/mp4-capture.ts create mode 100644 streams/src/mp4-capture.ts diff --git a/example-streams-web/camera/simple-mp4-player.js b/example-streams-web/camera/simple-mp4-player.js index 0cf36dab3..304ad6d08 100644 --- a/example-streams-web/camera/simple-mp4-player.js +++ b/example-streams-web/camera/simple-mp4-player.js @@ -18,7 +18,6 @@ const authorize = async (host) => { } } -let pipeline const play = (host) => { // Grab a reference to the video element const mediaElement = document.querySelector('video') @@ -31,8 +30,10 @@ const play = (host) => { pipeline.start().catch((err) => { console.error(err) }) + return pipeline } +let pipeline // Each time a device ip is entered, authorize and then play const playButton = document.querySelector('#play') playButton.addEventListener('click', async () => { @@ -45,3 +46,26 @@ playButton.addEventListener('click', async () => { pipeline = play(host) }) + +let stopCapture +const startCaptureButton = document.querySelector('#startCapture') +startCaptureButton.addEventListener('click', async () => { + if (!pipeline) { + console.error('No pipeline') + return + } + + stopCapture = await pipeline.capture((bytes) => { + console.log('Capture finished!', bytes.byteLength) + console.log(bytes) + }) +}) + +const stopCaptureButton = document.querySelector('#stopCapture') +stopCaptureButton.addEventListener('click', async () => { + if (!stopCapture) { + console.error('Capture not started!') + return + } + stopCapture() +}) diff --git a/example-streams-web/camera/simple-mp4.html b/example-streams-web/camera/simple-mp4.html index 931ddef22..8d2cd3b61 100644 --- a/example-streams-web/camera/simple-mp4.html +++ b/example-streams-web/camera/simple-mp4.html @@ -1,26 +1,25 @@ - - Example streaming MP4 video from Axis camera - - + + Example streaming MP4 video from Axis camera + + + +
-
- - -
- -
-
- + +
- - - + + + +
+
+ +
+ + + + diff --git a/example-streams-web/favicon.ico b/example-streams-web/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..2b94fc67886d73b16c230447ab540be8de613e72 GIT binary patch literal 3362 zcmeHJ&u<$=6n?X_rgmc+B_)xHtJH`Cy(A4q2oQ>qP@^W$8-WAjKS)8XB5`OF$N6Q6 zUO^!Kj@scwg@C9a_!o$9+Hwbps?M(W>-Wa9_NEl|5>8}S-+pi3&g{4E&6}AOF+8iM zMXypUiA;#dIigBjAk?o%qbTx2xew>y1w48W4sh@Jo7k7@*bnODRfj#ffxo1Q6`6xJ z4JB}_b_!z*Mn^|6K0c0exs0i)DQs_VV{L8CVSRlajYh-S^?LmuzN6GHl}ebNp2pVJ zmeWz)jg1YbueR#z_oz>8^&O>t8o$1Ojq$6NvO={sH#Z%p-*8m=V`KgNYy6u3R43KV z=3Sqq-=%&e9RI%`AOAW!tbyNH4}Z!w$N%D?@i#`0A_xplof^f&#B<2%XLFs6S8a_u z^|}1_y0}|uQu9;R-wpD0{2_JRC%SSI4Y_nkKdrq!8;p#MvG&HPKdiq-V;#+AleO4% z`B+(5apSPMx;oHTJ9#rk@wjbeTTfytLii&*}L5?w7!7Hx@ z_-Qx9?)MgVepH0`<<1HG`g0!VUU~+FLJ^abXHlt4qFQ|sXU?3(^0j5$>vYiB+r$0+ zeP`>vBH;{Fz$5m2Sw0xdF^$vuIX73!G)_Btp1Clbm)RVBvb^N#{E3%u-UW8vLDE`q zh+6Xi?e|-!5ws0Xv;xQBEzbS6p!W3?;`ufX=5HaIZy~zWCb!We&X?N|(?)1_V1gaw zY|HV-FMNh%VS#-vlc|-#AaEZ;?0J!_XO1vL2|-gDCsQwrGzibJ=fy;$&cq-0!M8bXU@%=C1o|HlC z?GKXz@;pm~TE%$8-Cz7}4m1#-q<*HW`d%yW5pmLUu@JfYtA(%lAde_t>Nn6v9{f-F G&yPngd_3_0 literal 0 HcmV?d00001 diff --git a/streams/src/components/index.ts b/streams/src/components/index.ts index 7812be5b5..242145fd3 100644 --- a/streams/src/components/index.ts +++ b/streams/src/components/index.ts @@ -4,7 +4,6 @@ export * from './utils' export * from './adapter' export * from './canvas' -export * from './mp4-capture' export * from './mp4-muxer' export * from './mse-sink' export * from './rtp' diff --git a/streams/src/components/mp4-capture.ts b/streams/src/components/mp4-capture.ts deleted file mode 100644 index 6969213c0..000000000 --- a/streams/src/components/mp4-capture.ts +++ /dev/null @@ -1,89 +0,0 @@ -import { logInfo } from '../log' - -import { IsomMessage } from './types' - -const MAX_CAPTURE_BYTES = 225000000 // 5 min at a rate of 6 Mbit/s - -/** - * Component that records MP4 data. - */ -export class Mp4Capture extends TransformStream { - private activeCallback?: (buffer: Uint8Array) => void - private capture: boolean - private bufferOffset: number - private readonly bufferSize: number - private buffer: Uint8Array - - constructor(maxSize = MAX_CAPTURE_BYTES) { - super({ - transform: (msg, controller) => { - const type = msg.type - const data = msg.data - - // Arrival of ISOM with MIME type indicates new movie, start recording if active. - if (this.activeCallback && msg.mimeType !== undefined) { - this.capture = true - } - - // If capture enabled, record all ISOM (MP4) boxes - if (this.capture) { - if (this.bufferOffset < this.buffer.byteLength - data.byteLength) { - this.buffer.set(data, this.bufferOffset) - this.bufferOffset += data.byteLength - } else { - this.stop() - } - } - controller.enqueue(msg) - }, - flush: () => { - this.stop() - }, - }) - - this.buffer = new Uint8Array(0) - this.bufferSize = maxSize - this.bufferOffset = 0 - - this.activeCallback = undefined - this.capture = false - } - - /** - * Activate video capture. The capture will begin when a new movie starts, - * and will terminate when the movie ends or when the buffer is full. On - * termination, the callback you passed will be called with the captured - * data as argument. - * @param callback Will be called when data is captured. - */ - start(callback: (buffer: Uint8Array) => void) { - if (!this.activeCallback) { - logInfo('start MP4 capture') - this.activeCallback = callback - this.buffer = new Uint8Array(this.bufferSize) - this.bufferOffset = 0 - } - } - - /** - * Deactivate video capture. This ends an ongoing capture and prevents - * any further capturing. - */ - stop() { - if (this.activeCallback) { - logInfo(`stop MP4 capture, collected ${this.bufferOffset} bytes`) - - try { - this.activeCallback(this.buffer.slice(0, this.bufferOffset)) - } catch (err) { - console.error('capture callback failed:', err) - } - - this.buffer = new Uint8Array(0) - this.bufferOffset = 0 - - this.activeCallback = undefined - this.capture = false - } - } -} diff --git a/streams/src/components/mse-sink.ts b/streams/src/components/mse-sink.ts index 1b4ac35f2..63ca94ca5 100644 --- a/streams/src/components/mse-sink.ts +++ b/streams/src/components/mse-sink.ts @@ -15,7 +15,10 @@ import { IsomMessage } from './types' */ export class MseSink { public readonly mediaSource: MediaSource = new MediaSource() - public writable: WritableStream + public readonly writable: WritableStream + + /** A function that will peek at ISOM messages, useful for example for capturing the MP4 data. */ + public onMessage?: (msg: IsomMessage) => void private lastCheckpointTime: number private sourceBuffer?: Promise @@ -57,6 +60,10 @@ export class MseSink { await freeBuffer(sourceBuffer, checkpoint) } + if (this.onMessage) { + this.onMessage(msg) + } + await appendBuffer(sourceBuffer, msg.data) }, close: async () => { diff --git a/streams/src/http-mp4-pipeline.ts b/streams/src/http-mp4-pipeline.ts index a32d4f85b..100cb7d09 100644 --- a/streams/src/http-mp4-pipeline.ts +++ b/streams/src/http-mp4-pipeline.ts @@ -1,4 +1,6 @@ import { Adapter, IsomMessage, MseSink } from './components' +import { logDebug } from './log' +import { setupMp4Capture } from './mp4-capture' export interface HttpMp4Config { uri: string @@ -18,43 +20,69 @@ export interface HttpMp4Config { export class HttpMp4Pipeline { public onHeaders?: (headers: Headers) => void public onServerClose?: () => void - /** Initiates the stream and resolves when the media stream has completed */ - public start: () => Promise - private readonly mediaElement: HTMLVideoElement - private readonly abortController: AbortController + private abortController?: AbortController private downloadedBytes: number = 0 + private options?: RequestInit + private readonly mediaElement: HTMLVideoElement + private uri: string constructor(config: HttpMp4Config) { const { uri, options, mediaElement } = config + this.uri = uri + this.options = options + this.mediaElement = mediaElement + } + + /** Initiates the stream and resolves when the media stream has completed. */ + public async start(msgHandler?: (msg: IsomMessage) => void) { + this.abortController?.abort('stream restarted') + this.abortController = new AbortController() - this.start = () => - fetch(uri, { signal: this.abortController.signal, ...options }) - .then(({ headers, body }) => { - const mimeType = headers.get('Content-Type') - if (!mimeType) { - throw new Error('missing MIME type in HTTP response headers') - } - if (body === null) { - throw new Error('missing body in HTTP response') - } - const adapter = new Adapter((chunk) => { - this.downloadedBytes += chunk.byteLength - return new IsomMessage({ data: chunk }) - }) - const mseSink = new MseSink(mediaElement, mimeType) - return body.pipeThrough(adapter).pipeTo(mseSink.writable) - }) - .catch((err) => { - console.error('failed to stream media:', err) - }) + const { ok, status, statusText, headers, body } = await fetch(this.uri, { + signal: this.abortController.signal, + ...this.options, + }) + + if (!ok) { + throw new Error(`response not ok, status: ${statusText} (${status})`) + } + + const mimeType = headers.get('Content-Type') + if (!mimeType) { + throw new Error('missing MIME type in HTTP response headers') + } + + if (body === null) { + throw new Error('missing body in HTTP response') + } + + const adapter = new Adapter((chunk) => { + this.downloadedBytes += chunk.byteLength + return new IsomMessage({ data: chunk }) + }) + + const mseSink = new MseSink(this.mediaElement, mimeType) + if (msgHandler) { + mseSink.onMessage = msgHandler + } + + body + .pipeThrough(adapter) + .pipeTo(mseSink.writable) + .then(() => { + logDebug(`http-mp4 pipeline ended: stream ended`) + }) + .catch((err) => { + logDebug(`http-mp4 pipeline ended: ${err}`) + }) } public close() { - this.abortController.abort() + this.abortController?.abort('Closed by user') } public get currentTime() { @@ -72,4 +100,14 @@ export class HttpMp4Pipeline { public get byteLength() { return this.downloadedBytes } + + /** Refresh the stream and passes the captured MP4 data to the provided + * callback. Capture can be ended by calling the returned trigger, or + * if the buffer reaches max size. */ + public async capture(callback: (bytes: Uint8Array) => void) { + this.close() + const { capture, triggerEnd } = setupMp4Capture(callback) + await this.start(capture) + return triggerEnd + } } diff --git a/streams/src/mp4-capture.ts b/streams/src/mp4-capture.ts new file mode 100644 index 000000000..493a6a70d --- /dev/null +++ b/streams/src/mp4-capture.ts @@ -0,0 +1,69 @@ +import { logInfo as logDebug } from './log' + +import { IsomMessage } from './components/types' +import { encode } from './components/utils/bytes' + +const MAX_BUFFER = 225000000 // 5 min at a rate of 6 Mbit/s + +// Detect the start of the movie by detecting an ftyp box. +const magicHeader = encode('ftyp') +function isFtypIsom(box: Uint8Array): boolean { + const header = box.subarray(4, 8) + return magicHeader.every((byte, i) => byte === header[i]) +} + +/** Given a callback and max buffer size, returns two functions, one that takes + * MP4 data (as ISOM message) and stores that data whenever it detects the start + * of a movie, and a function that triggers the end of data storage. The trigger + * is called automatically if the buffer is full. */ +export function setupMp4Capture( + cb: (bytes: Uint8Array) => void, + bufferSize = MAX_BUFFER +): { + capture: (msg: IsomMessage) => void + triggerEnd: () => void +} { + let active = true + let buffer = new Uint8Array(bufferSize) + let bufferOffset = 0 + let startOfMovie = false + + const triggerEnd = () => { + active = false + logDebug(`stop MP4 capture, collected ${bufferOffset} bytes`) + try { + cb(buffer.subarray(0, bufferOffset)) + } catch (err) { + console.error('capture callback failed:', err) + } + } + + const capture = (msg: IsomMessage) => { + if (!active) { + return + } + + // Arrival of ISOM with MIME type indicates new movie, start recording if active. + if (!startOfMovie) { + if (isFtypIsom(msg.data)) { + startOfMovie = true + logDebug('detected start of movie, proceeding with MP4 capture') + } else { + return + } + } + + // If movie started, record all ISOM (MP4) boxes + if (bufferOffset < buffer.byteLength - msg.data.byteLength) { + buffer.set(msg.data, bufferOffset) + bufferOffset += msg.data.byteLength + } else { + triggerEnd() + } + } + + return { + capture, + triggerEnd, + } +} diff --git a/streams/src/rtsp-mp4-pipeline.ts b/streams/src/rtsp-mp4-pipeline.ts index e5439d529..e3f7a4b13 100644 --- a/streams/src/rtsp-mp4-pipeline.ts +++ b/streams/src/rtsp-mp4-pipeline.ts @@ -9,7 +9,7 @@ import { Sdp, WSSource, } from './components' - +import { setupMp4Capture } from './mp4-capture' import { WebSocketConfig, openWebSocket } from './openwebsocket' export interface Html5VideoConfig { @@ -100,4 +100,15 @@ export class RtspMp4Pipeline { pause() { return this.videoEl.pause() } + + /** Refresh the stream and passes the captured MP4 data to the provided + * callback. Capture can be ended by calling the returned trigger, or + * if the buffer reaches max size. */ + async capture(callback: (bytes: Uint8Array) => void) { + await this.rtsp.teardown() + const { capture, triggerEnd } = setupMp4Capture(callback) + this.mse.onMessage = capture + await this.rtsp.start() + return triggerEnd + } } diff --git a/streams/tests/mp4-capture.test.ts b/streams/tests/mp4-capture.test.ts index b51b0a636..c33d14080 100644 --- a/streams/tests/mp4-capture.test.ts +++ b/streams/tests/mp4-capture.test.ts @@ -1,37 +1,39 @@ import * as assert from 'uvu/assert' import { describe } from './uvu-describe' -import EventEmitter from 'node:events' - -import { IsomMessage, Mp4Capture } from '../src/components' +import { IsomMessage } from '../src/components' +import { concat, encode } from '../src/components/utils/bytes' import { consumer, peeker, producer } from '../src/components/utils/streams' +import { setupMp4Capture } from '../src/mp4-capture' // Mocks -const MOCK_BUFFER_SIZE = 10 // Jest has problems with large buffers -const MOCK_MOVIE_DATA = 0xff -const MOCK_MOVIE_ENDING_DATA = 0xfe +const MOCK_BUFFER_SIZE = 100 // Jest has problems with large buffers +const MOCK_MOVIE_START = concat([new Uint8Array(4), encode('ftypisom')]) +const MOCK_MOVIE_BOX1 = new Uint8Array(8).fill(0xff) +const MOCK_MOVIE_BOX2 = new Uint8Array(8).fill(0xfe) // A movie consists of ISOM packets, starting with an ISOM message that has a // tracks property. We want to simulate the beginning and end of a movie, as // well as non-movie packets. const MOCK_MOVIE = [ - new IsomMessage({ - mimeType: 'video/mp4', - data: new Uint8Array(1).fill(MOCK_MOVIE_DATA), - }), - new IsomMessage({ data: new Uint8Array(1).fill(MOCK_MOVIE_DATA) }), + new IsomMessage({ mimeType: 'video/mp4', data: MOCK_MOVIE_START }), + new IsomMessage({ data: MOCK_MOVIE_BOX1 }), + new IsomMessage({ data: MOCK_MOVIE_BOX1 }), + new IsomMessage({ data: MOCK_MOVIE_BOX1 }), + new IsomMessage({ data: MOCK_MOVIE_BOX1 }), ] as const -const MOCK_MOVIE_BUFFER = new Uint8Array(2).fill(MOCK_MOVIE_DATA) +const MOCK_MOVIE_DATA = concat(MOCK_MOVIE.map(({ data }) => data)) const MOCK_MOVIE_ENDING = [ - new IsomMessage({ data: new Uint8Array(1).fill(MOCK_MOVIE_ENDING_DATA) }), - new IsomMessage({ data: new Uint8Array(1).fill(MOCK_MOVIE_ENDING_DATA) }), - new IsomMessage({ data: new Uint8Array(1).fill(MOCK_MOVIE_ENDING_DATA) }), - new IsomMessage({ data: new Uint8Array(1).fill(MOCK_MOVIE_ENDING_DATA) }), + new IsomMessage({ data: MOCK_MOVIE_BOX2 }), + new IsomMessage({ data: MOCK_MOVIE_BOX2 }), + new IsomMessage({ data: MOCK_MOVIE_BOX2 }), + new IsomMessage({ data: MOCK_MOVIE_BOX2 }), ] as const // Set up a pipeline: source - capture - sink. const pipelineFactory = ( + onMessage: (msg: IsomMessage) => void, ...fragments: ReadonlyArray> ) => { const sourceMessages = ([] as ReadonlyArray).concat(...fragments) @@ -40,90 +42,67 @@ const pipelineFactory = ( sinkCalled.value++ } - const broadcast = new EventEmitter() - const source = producer(sourceMessages) const peek = peeker((msg: IsomMessage) => { - console.log('message', msg) - broadcast.emit('message', msg) + onMessage(msg) }) - const capture = new Mp4Capture(MOCK_BUFFER_SIZE) const sink = consumer(sinkHandler) - const flow = () => source.pipeThrough(peek).pipeThrough(capture).pipeTo(sink) + const flow = () => source.pipeThrough(peek).pipeTo(sink) return { - broadcast, - source, - capture, + flow, sink, sinkCalled, - flow, + source, } } describe('data copying', (test) => { - test('should not occur when capture inactive', async () => { - const pipeline = pipelineFactory(MOCK_MOVIE) + test('captured buffer should match movie', async () => { + let captured: Uint8Array | undefined + const { capture, triggerEnd } = setupMp4Capture((bytes) => { + captured = bytes + }, MOCK_BUFFER_SIZE) + const pipeline = pipelineFactory(capture, MOCK_MOVIE) // Start the pipeline (this will flow the messages) await pipeline.flow() - assert.is(pipeline.sinkCalled.value, MOCK_MOVIE.length) - // @ts-ignore access private property - assert.is(pipeline.capture.bufferOffset, 0) - }) - - test('should occur when capture active', async () => { - const pipeline = pipelineFactory(MOCK_MOVIE) - - // Activate capture. - let capturedBuffer = new Uint8Array(0) - const captureHandler = (buffer: Uint8Array) => { - capturedBuffer = buffer - } - pipeline.capture.start(captureHandler) - - // Start the pipeline (this will flow the messages) - await pipeline.flow() - - assert.equal(capturedBuffer, MOCK_MOVIE_BUFFER) + triggerEnd() + assert.equal(captured, MOCK_MOVIE_DATA) }) test('should only occur when new movie has started', async () => { - const pipeline = pipelineFactory(MOCK_MOVIE_ENDING, MOCK_MOVIE) - - // Activate capture. - let capturedBuffer = new Uint8Array(0) - const captureHandler = (buffer: Uint8Array) => { - capturedBuffer = buffer - } - pipeline.capture.start(captureHandler) + let captured: Uint8Array | undefined + const { capture, triggerEnd } = setupMp4Capture((bytes) => { + captured = bytes + }, MOCK_BUFFER_SIZE) + const pipeline = pipelineFactory(capture, MOCK_MOVIE_ENDING, MOCK_MOVIE) // Start the pipeline (this will flow the messages) await pipeline.flow() - assert.equal(capturedBuffer, MOCK_MOVIE_BUFFER) + triggerEnd() + assert.equal(captured, MOCK_MOVIE_DATA) }) - test('should stop when requested', async () => { - const pipeline = pipelineFactory(MOCK_MOVIE, MOCK_MOVIE_ENDING) - - // Activate capture. - let capturedBuffer = new Uint8Array(0) - const captureHandler = (buffer: Uint8Array) => { - capturedBuffer = buffer - } - pipeline.capture.start(captureHandler) - pipeline.broadcast.on('message', (msg) => { + test('should stop when triggered', async () => { + let captured: Uint8Array | undefined + const { capture, triggerEnd } = setupMp4Capture((bytes) => { + captured = bytes + }, MOCK_BUFFER_SIZE) + const autoEnd = (msg: IsomMessage) => { if (msg.data[0] === 0xfe) { - pipeline.capture.stop() + triggerEnd() } - }) + capture(msg) + } + const pipeline = pipelineFactory(autoEnd, MOCK_MOVIE, MOCK_MOVIE_ENDING) // Start the pipeline (this will flow the messages) await pipeline.flow() - assert.equal(capturedBuffer, MOCK_MOVIE_BUFFER) + assert.equal(captured, MOCK_MOVIE_DATA) }) })