Skip to content

Commit

Permalink
chore(streams): Buffer -> Uint8Array
Browse files Browse the repository at this point in the history
Replace use of `Buffer` with `Uint8Array`. The latter is now widely
supported and available in Node.js and Browsers.

Notable differences:
- `Buffer.slice(...)` has been replaced with `Uint8Array.subarray(...)`
  as that is the actual behaviour of the original method
  (`Uint8Array.slice(...)` makes a copy).
- Converting to and from strings is handled by `TextEncoder`/`TextDecoder`
- Converting to and from differently sized big-endian integers is
  handled by using `DataView`

Co-authored-by: Rikard Tegnander <[email protected]>
  • Loading branch information
steabert and Rikard Tegnander committed Dec 24, 2024
1 parent 998d5e2 commit e146e60
Show file tree
Hide file tree
Showing 52 changed files with 678 additions and 567 deletions.
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ esbuild workspace *args:

# autofix and format changed files
format +FILES="`just changed`":
just biome check --apply {{ FILES }}
just biome check --write {{ FILES }}

# install dependencies
install:
Expand Down
8 changes: 5 additions & 3 deletions streams/esbuild.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ const browserBundles = [
{
format: 'esm',
name: 'browser-light-esm.js',
external: ['buffer', 'debug', 'process', 'stream', 'ts-md5', 'ws'],
external: ['debug', 'process', 'stream', 'ts-md5', 'ws'],
inject: ['polyfill.mjs'],
},
{
format: 'cjs',
name: 'browser-light-cjs.js',
external: ['buffer', 'debug', 'process', 'stream', 'ts-md5', 'ws'],
external: ['debug', 'process', 'stream', 'ts-md5', 'ws'],
inject: ['polyfill.mjs'],
},
]

Expand Down Expand Up @@ -68,7 +70,7 @@ for (const { format, name } of nodeBundles) {
entryPoints: ['src/index.node.ts'],
outfile: join(buildDir, name),
format,
external: ['stream', 'buffer', 'process', 'ws'],
external: ['stream', 'process', 'ws'],
bundle: true,
minify: false,
sourcemap: true,
Expand Down
2 changes: 1 addition & 1 deletion streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"README.md"
],
"dependencies": {
"base64-js": "1.5.1",
"debug": "4.3.6",
"process": "0.11.10",
"ts-md5": "1.3.1",
Expand All @@ -59,7 +60,6 @@
"@types/debug": "4.1.12",
"@types/node": "20.12.5",
"@types/ws": "8.5.12",
"buffer": "6.0.3",
"esbuild": "0.23.0",
"events": "3.3.0",
"global-jsdom": "9.2.0",
Expand Down
2 changes: 0 additions & 2 deletions streams/polyfill.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
import { Buffer } from 'buffer'
window.Buffer = Buffer
import * as process_browser from 'process/browser'
window.process_browser = process_browser
5 changes: 3 additions & 2 deletions streams/src/components/aacdepay/parser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { readUInt16BE } from 'utils/bytes'
import { payload, payloadType, timestamp } from '../../utils/protocols/rtp'
import { ElementaryMessage, MessageType, RtpMessage } from '../message'

Expand Down Expand Up @@ -41,12 +42,12 @@ export function parse(

let headerLength = 0
if (hasHeader) {
const auHeaderLengthInBits = buffer.readUInt16BE(0)
const auHeaderLengthInBits = readUInt16BE(buffer, 0)
headerLength = 2 + (auHeaderLengthInBits + (auHeaderLengthInBits % 8)) / 8 // Add padding
}
const packet: ElementaryMessage = {
type: MessageType.ELEMENTARY,
data: buffer.slice(headerLength),
data: new Uint8Array(buffer.subarray(headerLength)),
payloadType: payloadType(rtp.data),
timestamp: timestamp(rtp.data),
ntpTimestamp: rtp.ntpTimestamp,
Expand Down
8 changes: 4 additions & 4 deletions streams/src/components/auth/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { encode } from 'utils/bytes'
import { merge } from '../../utils/config'
import { statusCode } from '../../utils/protocols/rtsp'
import { Tube } from '../component'
import { Message, MessageType, RtspMessage } from '../message'
import { createTransform } from '../messageStreams'

import { fromByteArray } from 'base64-js'
import { DigestAuth } from './digest'
import { parseWWWAuthenticate } from './www-authenticate'

Expand Down Expand Up @@ -56,7 +58,7 @@ export class Auth extends Tube {
) {
if (
msg.type === MessageType.RTSP &&
statusCode(msg.data) === UNAUTHORIZED
statusCode(msg.data.toString()) === UNAUTHORIZED
) {
const headers = msg.data.toString().split('\n')
const wwwAuth = headers.find((header) => /WWW-Auth/i.test(header))
Expand All @@ -65,9 +67,7 @@ export class Auth extends Tube {
}
const challenge = parseWWWAuthenticate(wwwAuth)
if (challenge.type === 'basic') {
authHeader = `Basic ${Buffer.from(`${username}:${password}`).toString(
'base64'
)}`
authHeader = `Basic ${fromByteArray(encode(`${username}:${password}`))}`
} else if (challenge.type === 'digest') {
const digest = new DigestAuth(challenge.params, username, password)
authHeader = digest.authorization(
Expand Down
7 changes: 4 additions & 3 deletions streams/src/components/basicdepay/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { concat } from 'utils/bytes'
import {
marker,
payload,
Expand All @@ -14,7 +15,7 @@ export class BasicDepay extends Tube {
throw new Error('you must supply a payload type to BasicDepayComponent')
}

let buffer = Buffer.alloc(0)
let buffer = new Uint8Array(0)

const incoming = createTransform(function (
msg: Message,
Expand All @@ -26,7 +27,7 @@ export class BasicDepay extends Tube {
payloadType(msg.data) === rtpPayloadType
) {
const rtpPayload = payload(msg.data)
buffer = Buffer.concat([buffer, rtpPayload])
buffer = concat([buffer, rtpPayload])

if (marker(msg.data)) {
if (buffer.length > 0) {
Expand All @@ -38,7 +39,7 @@ export class BasicDepay extends Tube {
type: MessageType.ELEMENTARY,
})
}
buffer = Buffer.alloc(0)
buffer = new Uint8Array(0)
}
callback()
} else {
Expand Down
5 changes: 3 additions & 2 deletions streams/src/components/h264depay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import { VideoMedia } from '../../utils/protocols/sdp'
import { Tube } from '../component'
import { Message, MessageType } from '../message'

import { concat } from 'utils/bytes'
import { H264DepayParser, NAL_TYPES } from './parser'

export class H264Depay extends Tube {
constructor() {
let h264PayloadType: number
let idrFound = false
let packets: Buffer[] = []
let packets: Uint8Array[] = []

const h264DepayParser = new H264DepayParser()

Expand Down Expand Up @@ -59,7 +60,7 @@ export class H264Depay extends Tube {
if (endOfFrame) {
this.push({
...h264Message,
data: packets.length === 1 ? packets[0] : Buffer.concat(packets),
data: packets.length === 1 ? packets[0] : concat(packets),
})
packets = []
}
Expand Down
31 changes: 16 additions & 15 deletions streams/src/components/h264depay/parser.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import debug from 'debug'

import { concat, writeUInt32BE } from 'utils/bytes'
import { payload, payloadType, timestamp } from '../../utils/protocols/rtp'
import { H264Message, MessageType, RtpMessage } from '../message'

Expand Down Expand Up @@ -30,10 +31,10 @@ First byte in payload (rtp payload header):
const h264Debug = debug('msl:h264depay')

export class H264DepayParser {
private _buffer: Buffer
private _buffer: Uint8Array

constructor() {
this._buffer = Buffer.alloc(0)
this._buffer = new Uint8Array(0)
}

parse(rtp: RtpMessage): H264Message | null {
Expand All @@ -48,17 +49,17 @@ export class H264DepayParser {
const nal = (fuIndicator & 0xe0) | nalType
const stopBit = fuHeader & 64
if (startBit) {
this._buffer = Buffer.concat([
Buffer.from([0, 0, 0, 0, nal]),
rtpPayload.slice(2),
this._buffer = concat([
new Uint8Array([0, 0, 0, 0, nal]),
rtpPayload.subarray(2),
])
return null
} else if (stopBit) {
/* receieved end bit */ const h264frame = Buffer.concat([
/* receieved end bit */ const h264frame = concat([
this._buffer,
rtpPayload.slice(2),
rtpPayload.subarray(2),
])
h264frame.writeUInt32BE(h264frame.length - 4, 0)
writeUInt32BE(h264frame, 0, h264frame.length - 4)
const msg: H264Message = {
data: h264frame,
type: MessageType.H264,
Expand All @@ -67,21 +68,21 @@ export class H264DepayParser {
payloadType: payloadType(rtp.data),
nalType,
}
this._buffer = Buffer.alloc(0)
this._buffer = new Uint8Array(0)
return msg
}
// Put the received data on the buffer and cut the header bytes
this._buffer = Buffer.concat([this._buffer, rtpPayload.slice(2)])
this._buffer = concat([this._buffer, rtpPayload.subarray(2)])
return null
} else if (
(type === NAL_TYPES.NON_IDR_PICTURE || type === NAL_TYPES.IDR_PICTURE) &&
this._buffer.length === 0
) {
/* Single NALU */ const h264frame = Buffer.concat([
Buffer.from([0, 0, 0, 0]),
/* Single NALU */ const h264frame = concat([
new Uint8Array([0, 0, 0, 0]),
rtpPayload,
])
h264frame.writeUInt32BE(h264frame.length - 4, 0)
writeUInt32BE(h264frame, 0, h264frame.length - 4)
const msg: H264Message = {
data: h264frame,
type: MessageType.H264,
Expand All @@ -90,13 +91,13 @@ export class H264DepayParser {
payloadType: payloadType(rtp.data),
nalType: type,
}
this._buffer = Buffer.alloc(0)
this._buffer = new Uint8Array(0)
return msg
}
h264Debug(
`H264depayComponent can only extract types 1,5 and 28, got ${type}`
)
this._buffer = Buffer.alloc(0)
this._buffer = new Uint8Array(0)
return null
}
}
8 changes: 5 additions & 3 deletions streams/src/components/helpers/stream-factory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Readable, Transform, Writable } from 'stream'

import { fromByteArray, toByteArray } from 'base64-js'

export default class StreamFactory {
/**
* Creates a writable stream that sends all messages written to the stream
Expand Down Expand Up @@ -61,7 +63,7 @@ export default class StreamFactory {
const timestamp = Date.now()
// Replace binary data with base64 string
const message = Object.assign({}, msg, {
data: msg.data.toString('base64'),
data: fromByteArray(msg.data),
})
fileStream.write(JSON.stringify({ type, timestamp, message }, null, 2))
fileStream.write(',\n')
Expand All @@ -87,8 +89,8 @@ export default class StreamFactory {
lastTimestamp = timestamp
if (message) {
const data = message.data
? Buffer.from(message.data, 'base64')
: Buffer.alloc(0)
? toByteArray(message.data)
: new Uint8Array(0)
const msg = Object.assign({}, message, { data })
this.push({ type, delay, msg })
} else {
Expand Down
17 changes: 8 additions & 9 deletions streams/src/components/http-mp4/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class HttpMp4Source extends Source {
// When an error is sent on the incoming stream, close the socket.
incoming.on('error', (e) => {
console.warn('closing socket due to incoming error', e)
this._reader && this._reader.cancel().catch((err) => console.error(err))
this._reader?.cancel().catch((err) => console.error(err))
})

/**
Expand Down Expand Up @@ -87,12 +87,12 @@ export class HttpMp4Source extends Source {

const contentType = rsp.headers.get('Content-Type')
this.incoming.push({
data: Buffer.alloc(0),
data: new Uint8Array(0),
type: MessageType.ISOM,
mime: contentType,
})

this.onHeaders && this.onHeaders(rsp.headers)
this.onHeaders?.(rsp.headers)

this._reader = rsp.body.getReader()
this._pull()
Expand All @@ -103,11 +103,10 @@ export class HttpMp4Source extends Source {
}

abort(): void {
this._reader &&
this._reader.cancel().catch((err) => {
console.log('http-source: cancel reader failed: ', err)
})
this._abortController && this._abortController.abort()
this._reader?.cancel().catch((err) => {
console.log('http-source: cancel reader failed: ', err)
})
this._abortController?.abort()
}

_isClosed(): boolean {
Expand Down Expand Up @@ -143,7 +142,7 @@ export class HttpMp4Source extends Source {
throw new Error('expected length to be defined')
}
this.length += value.length
const buffer = Buffer.from(value)
const buffer = value
if (!this.incoming.push({ data: buffer, type: MessageType.ISOM })) {
// Something happened down stream that it is no longer processing the
// incoming data, and the stream buffer got full.
Expand Down
2 changes: 1 addition & 1 deletion streams/src/components/http-source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class HttpSource extends Source {
throw new Error('expected length to be defined')
}
this.length += value.length
const buffer = Buffer.from(value)
const buffer = value
if (!this.incoming.push({ data: buffer, type: MessageType.RAW })) {
// Something happened down stream that it is no longer processing the
// incoming data, and the stream buffer got full.
Expand Down
Loading

0 comments on commit e146e60

Please sign in to comment.