Skip to content

Commit

Permalink
feat(streams)!: use Web Streams API
Browse files Browse the repository at this point in the history
Replace Node.js stream module with Web Streams API.
Since the latter is substantially different in some important details
regarding stream pipelining, the way pipelines are built are redefined.
The component concept as module blocks is removed and instead the
components themselves expose streams that can be combined together.
The pipelines are then simple stream compositions.

Because this is a major (breaking) change, it's done in concert with
other planned improvements that are also breaking changes (see section
below for details).

Improvements:
- replacing the Node.js stream module with Web Streams API removes
  the dependency on the (legacy) stream-browserify package and
  results in a much smaller library (bundle) size, so there is no longer
  a need for a separate "light" version
- `debug` package replaced by custom internal logging utilities
  (allowing proper ES module support) Fixes #990, Closes #992

Refactoring:
- RTSP session and parser are combined in a single component and the
  session controller has been rewritten as a request-response flow.
  An async `start` method starts the streams and returns SDP + range.
- RTP depay is combined into a single component that detects
  the proper format based on payloadType, and allows registering
  a "peeker" that can inspect messages (instead of having to insert
  an extra transform stream)
- Extended use of TypeScript in areas where this was lacking

BREAKING CHANGES:
- No support for CommonJS:
  - Node.js has support for ES modules
  - Browsers have support for ES modules, but you can also still
    use the IIFE global variable, or use a bundler (all of which
    support ES modules)
- No distinction between Node.js/Browser:
  - The library targets mainly Browser, so some things rely on `window`
    and expect it to be present, however most things work both platforms.
  - Node-only pipelines are removed, these are trivial to re-implement
    with Web Streams API if necessary. The CLI player has its own TCP
    source for that reason (replacing the CliXyz pipelines).
- The generic "component" and "pipeline" classes were removed:
  - Components extend Web Streams API instead
  - Pipelines rely on `pipeTo`/`pipeThrough` composition and a `start`
    method to initiate flow of data.
- Some public methods on pipelines have been removed (refer to their
  type for details) in cases where a simple alternative is available, or
  check the examples to see how to modify usage. There are less pipelines
  but they are more versatile, with accessible readonly components.
  In general, promises/async methods are preferred over callbacks.
  • Loading branch information
steabert committed Jan 8, 2025
1 parent 674a2f8 commit 0fa7fcc
Show file tree
Hide file tree
Showing 197 changed files with 5,935 additions and 10,106 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.log

# Coverage directory (tests with coverage)
build/
coverage/

# Bundles
Expand Down
65 changes: 0 additions & 65 deletions example-streams-node/mjpeg-player.js

This file was deleted.

82 changes: 82 additions & 0 deletions example-streams-node/pipeline.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { createConnection } from 'node:net'
import { Mp4Muxer, RtpDepay, RtspSession } from 'media-stream-library'

class TcpSource {
constructor(socket) {
if (socket === undefined) {
throw new Error('socket argument missing')
}

this.readable = new ReadableStream({
start: (controller) => {
socket.on('data', (chunk) => {
controller.enqueue(new Uint8Array(chunk))
})
socket.on('end', () => {
console.error('server closed connection')
controller.close()
})
},
cancel: () => {
console.error('canceling TCP client')
socket.close(CLOSE_ABORTED, 'client canceled')
},
})

this.writable = new WritableStream({
start: (controller) => {
socket.on('end', () => {
controller.error('socket closed')
})
socket.on('error', () => {
controller.error('socket errored')
})
},
write: (chunk) => {
try {
socket.write(chunk)
} catch (err) {
console.error('chunk lost during send:', err)
}
},
close: () => {
console.error('closing TCP client')
socket.destroy('normal closure')
},
abort: (reason) => {
console.error('aborting TCP client:', reason && reason.message)
socket.destroy('abort')
},
})
}
}

export async function start(rtspUri) {
const url = new URL(rtspUri)
const socket = createConnection(url.port, url.hostname)
await new Promise((resolve) => {
socket.once('connect', resolve)
})

const tcpSource = new TcpSource(socket)
const rtspSession = new RtspSession({ uri: rtspUri })
const rtpDepay = new RtpDepay()
const mp4Muxer = new Mp4Muxer()

const stdout = new WritableStream({
write: (msg, controller) => {
process.stdout.write(msg.data)
},
})

rtspSession.play()

return Promise.all([
tcpSource.readable
.pipeThrough(rtspSession.demuxer)
.pipeThrough(rtpDepay)
.pipeThrough(mp4Muxer)
.pipeTo(stdout),
rtspSession.commands.pipeTo(tcpSource.writable),
])
}
63 changes: 0 additions & 63 deletions example-streams-node/player.cjs

This file was deleted.

48 changes: 48 additions & 0 deletions example-streams-node/player.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { start } from './pipeline.mjs'

function help() {
console.log(`
Stream live from camera (to be used from Node CLI).
Command line tool to open a websocket/rtsp connection to a camera.
Example usage:
node player.mjs rtsp://192.168.0.2/axis-media/media.amp?audio=1&resolution=800x600 | vlc -
Some VAPIX options:
- videocodec=[h264,mpeg4,jpeg] (Select a specific video codec)
- streamprofile=<name> (Use a specific stream profile)
- recordingid=<name> (Play a specific recording)
- resolution=<wxh> (The required resolution, e.g. 800x600)
- audio=[0,1] (Enable=1 or disable=0 audio)
- camera=[1,2,...,quad] (Select a video source)
- compression=[0..100] (Vary between no=0 and full=100 compression)
- colorlevel=[0..100] (Vary between grey=0 and color=100)
- color=[0,1] (Enable=0 or disable=0 color)
- clock=[0,1] (Show=1 or hide=0 the clock)
- date=[0,1] (Show=1 or hide=0 the date)
- text=[0,1] (Show=1 or hide=0 the text overlay)
- textstring=<message>
- textcolor=[black,white]
- textbackgroundcolor=[black,white,transparent,semitransparent]
- textpos=[0,1] (Show text at top=0 or bottom=0)
- rotation=[0,90,180,270] (How may degrees to rotate the strea,)
- duration=<number> (How many seconds of video you want, unlimited=0)
- nbrofframes=<number> (How many frames of video you want, unlimited=0)
- fps=<number> (How many frames per second, unlimited=0)
`)
}

const [uri] = process.argv.slice(2)
if (!uri) {
console.error('You must specify either a host or full RTSP uri')
help()
process.exit(1)
}

// Setup a new pipeline
// const pipeline = new pipelines.CliMp4Pipeline(config)
// pipeline.rtsp.play()
start(uri).catch((err) => {
console.error('failed:', err)
})
17 changes: 7 additions & 10 deletions example-streams-web/test/h264-overlay-player.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { components, pipelines, utils } = window.mediaStreamLibrary
const { RtspMp4Pipeline, Scheduler } = window.mediaStreamLibrary
const d3 = window.d3

const play = (host) => {
Expand Down Expand Up @@ -42,7 +42,7 @@ const play = (host) => {
}

// Setup a new pipeline
const pipeline = new pipelines.Html5VideoPipeline({
const pipeline = new RtspMp4Pipeline({
ws: { uri: `ws://${host}:8854/` },
rtsp: { uri: `rtsp://localhost:8554/test` },
mediaElement,
Expand All @@ -51,19 +51,16 @@ const play = (host) => {
// Create a scheduler and insert it into the pipeline with
// a peek component, which will call the run method of the
// scheduler every time a message passes on the pipeline.
const scheduler = new utils.Scheduler(pipeline, draw)
const runScheduler = components.Tube.fromHandlers((msg) => scheduler.run(msg))
pipeline.insertBefore(pipeline.lastComponent, runScheduler)
const scheduler = new Scheduler(pipeline, draw)
pipeline.rtp.peek(['h264'], (msg) => scheduler.run(msg))

// When we now the UNIX time of the start of the presentation,
// initialize the scheduler with it.
pipeline.onSync = (ntpPresentationTime) => {
pipeline.videoStartTime.then((ntpPresentationTime) => {
scheduler.init(ntpPresentationTime)
}

pipeline.ready.then(() => {
pipeline.rtsp.play()
})

pipeline.start()
}

play(window.location.hostname)
12 changes: 5 additions & 7 deletions example-streams-web/test/h264-player.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
const { pipelines } = window.mediaStreamLibrary
const { RtspMp4Pipeline } = window.mediaStreamLibrary

const play = (host) => {
// Grab a reference to the video element
const mediaElement = document.querySelector('video')

console.warn('HI', host)

// Setup a new pipeline
const pipeline = new pipelines.Html5VideoPipeline({
const pipeline = new RtspMp4Pipeline({
ws: { uri: `ws://${host}:8854/` },
rtsp: { uri: `rtsp://localhost:8554/test` },
mediaElement,
})
pipeline.ready.then(() => {
pipeline.rtsp.play()
})

pipeline.onSourceOpen = (mse) => {
// Setting a duration of zero seems to force lower latency
// on Firefox, and doesn't seem to affect Chromium.
mse.duration = 0
}

pipeline.start()
}

play(window.location.hostname)
20 changes: 9 additions & 11 deletions example-streams-web/test/mjpeg-overlay-player.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { components, pipelines, utils } = window.mediaStreamLibrary
const { RtspJpegPipeline, Scheduler } = window.mediaStreamLibrary
const d3 = window.d3

const play = (host) => {
Expand Down Expand Up @@ -38,28 +38,26 @@ const play = (host) => {
}

// Setup a new pipeline
const pipeline = new pipelines.Html5CanvasPipeline({
ws: { uri: `ws://${host}:8854/` },
const pipeline = new RtspJpegPipeline({
ws: { uri: `ws://${host}:8855/` },
rtsp: { uri: `rtsp://localhost:8555/test` },
mediaElement,
})

// Create a scheduler and insert it into the pipeline with
// a peek component, which will call the run method of the
// scheduler every time a message passes on the pipeline.
const scheduler = new utils.Scheduler(pipeline, draw)
const runScheduler = components.Tube.fromHandlers((msg) => scheduler.run(msg))
pipeline.insertBefore(pipeline.lastComponent, runScheduler)
const scheduler = new Scheduler(pipeline, draw)
pipeline.rtp.peek(['jpeg'], (msg) => scheduler.run(msg))

// When we now the UNIX time of the start of the presentation,
// initialize the scheduler with it.
pipeline.onSync = (ntpPresentationTime) => {
pipeline.videoStartTime.then((ntpPresentationTime) => {
scheduler.init(ntpPresentationTime)
}

pipeline.ready.then(() => {
pipeline.rtsp.play()
})

pipeline.start()
pipeline.play()
}

play(window.location.hostname)
Loading

0 comments on commit 0fa7fcc

Please sign in to comment.