Skip to content

Commit

Permalink
chore(streams): rtp-parser Buffer -> Uint8Array
Browse files Browse the repository at this point in the history
  • Loading branch information
steabert committed Dec 9, 2024
1 parent 1597dc3 commit 9c1671f
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 70 deletions.
5 changes: 3 additions & 2 deletions streams/src/components/aacdepay/parser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { readUInt16BE } from 'utils/bytes'
import { payload, payloadType, timestamp } from '../../utils/protocols/rtp'
import { ElementaryMessage, MessageType, RtpMessage } from '../message'

Expand Down Expand Up @@ -41,12 +42,12 @@ export function parse(

let headerLength = 0
if (hasHeader) {
const auHeaderLengthInBits = buffer.readUInt16BE(0)
const auHeaderLengthInBits = readUInt16BE(buffer, 0)
headerLength = 2 + (auHeaderLengthInBits + (auHeaderLengthInBits % 8)) / 8 // Add padding
}
const packet: ElementaryMessage = {
type: MessageType.ELEMENTARY,
data: buffer.slice(headerLength),
data: Buffer.from(buffer.subarray(headerLength)),
payloadType: payloadType(rtp.data),
timestamp: timestamp(rtp.data),
ntpTimestamp: rtp.ntpTimestamp,
Expand Down
2 changes: 1 addition & 1 deletion streams/src/components/jpegdepay/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function makeImageHeader() {
return Buffer.from([0xff, 0xd8])
}

export function makeQuantHeader(precision: number, qTable: Buffer) {
export function makeQuantHeader(precision: number, qTable: Uint8Array) {
const lumSize = precision & 1 ? 128 : 64
const chmSize = precision & 2 ? 128 : 64
if (qTable.length !== lumSize + chmSize) {
Expand Down
8 changes: 4 additions & 4 deletions streams/src/components/jpegdepay/make-qtable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ const jpeChromaQuantizer = [
99, 99, 99, 99, 99, 99, 99,
]

export function makeQtable(Q: number): Buffer {
export function makeQtable(Q: number): Uint8Array {
const factor = clamp(Q, 1, 99)
const buffer = Buffer.alloc(128)
const buffer = new Uint8Array(128)
const S = Q < 50 ? Math.floor(5000 / factor) : 200 - factor * 2

for (let i = 0; i < 64; i++) {
const lq = Math.floor((jpegLumaQuantizer[i] * S + 50) / 100)
const cq = Math.floor((jpeChromaQuantizer[i] * S + 50) / 100)
buffer.writeUInt8(clamp(lq, 1, 255), i)
buffer.writeUInt8(clamp(cq, 1, 255), i + 64)
buffer.set([clamp(lq, 1, 255)], i)
buffer.set([clamp(cq, 1, 255)], i + 64)
}
return buffer
}
25 changes: 13 additions & 12 deletions streams/src/components/jpegdepay/parser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { readUInt8, readUInt16BE } from 'utils/bytes'
import { payload } from '../../utils/protocols/rtp'

import {
Expand Down Expand Up @@ -51,34 +52,34 @@ export function jpegDepayFactory(defaultWidth = 0, defaultHeight = 0) {

return function jpegDepay(packets: Buffer[]) {
let metadata
const fragments: Buffer[] = []
const fragments: Uint8Array[] = []
for (const packet of packets) {
let fragment = payload(packet)

// Parse and extract JPEG header.
const typeSpecific = fragment.readUInt8(0)
const typeSpecific = readUInt8(fragment, 0)
const fragmentOffset =
(fragment.readUInt8(1) << 16) |
(fragment.readUInt8(2) << 8) |
fragment.readUInt8(3)
const type = fragment.readUInt8(4)
const Q = fragment.readUInt8(5)
const width = fragment.readUInt8(6) * 8 || defaultWidth
const height = fragment.readUInt8(7) * 8 || defaultHeight
(readUInt8(fragment, 1) << 16) |
(readUInt8(fragment, 2) << 8) |
readUInt8(fragment, 3)
const type = readUInt8(fragment, 4)
const Q = readUInt8(fragment, 5)
const width = readUInt8(fragment, 6) * 8 || defaultWidth
const height = readUInt8(fragment, 7) * 8 || defaultHeight
fragment = fragment.slice(8)

// Parse and extract Restart Marker header if present.
let DRI = 0
if (type >= 64 && type <= 127) {
DRI = fragment.readUInt16BE(0)
DRI = readUInt16BE(fragment, 0)
fragment = fragment.slice(4)
}

// Parse and extract Quantization Table header if present.
if (Q >= 128 && fragmentOffset === 0) {
// const MBZ = fragment.readUInt8()
const precision = fragment.readUInt8(1)
const length = fragment.readUInt16BE(2)
const precision = readUInt8(fragment, 1)
const length = readUInt16BE(fragment, 2)
const qTable = fragment.slice(4, 4 + length)
metadata = {
typeSpecific,
Expand Down
2 changes: 1 addition & 1 deletion streams/src/components/onvifdepay/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Message, MessageType, XmlMessage } from '../message'
export class ONVIFDepay extends Tube {
constructor() {
let XMLPayloadType: number
let packets: Buffer[] = []
let packets: Uint8Array[] = []

const incoming = new Transform({
objectMode: true,
Expand Down
27 changes: 27 additions & 0 deletions streams/src/utils/bytes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,30 @@ export function decode(bytes: Uint8Array): string {
const dec = new TextDecoder()
return dec.decode(bytes)
}

// Extract 8-bit big-endian value at byte offset
export function readUInt8(bytes: Uint8Array, byteOffset: number): number {
return new DataView(
bytes.buffer,
bytes.byteOffset,
bytes.byteLength
).getUint8(byteOffset)
}

// Extract 16-bit big-endian value at byte offset
export function readUInt16BE(bytes: Uint8Array, byteOffset: number): number {
return new DataView(
bytes.buffer,
bytes.byteOffset,
bytes.byteLength
).getUint16(byteOffset)
}

// Extract 32-bit big-endian value at byte offset
export function readUInt32BE(bytes: Uint8Array, byteOffset: number): number {
return new DataView(
bytes.buffer,
bytes.byteOffset,
bytes.byteLength
).getUint32(byteOffset)
}
67 changes: 34 additions & 33 deletions streams/src/utils/protocols/rtp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { readUInt16BE, readUInt32BE } from 'utils/bytes'
import { POS } from '../bits'

// Real Time Protocol (RTP)
Expand Down Expand Up @@ -25,63 +26,63 @@ RTP Fixed Header Fields
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/

export const version = (buffer: Buffer) => {
return buffer[0] >>> 6
export const version = (bytes: Uint8Array): number => {
return bytes[0] >>> 6
}

export const padding = (buffer: Buffer) => {
return !!(buffer[0] & POS[2])
export const padding = (bytes: Uint8Array): boolean => {
return !!(bytes[0] & POS[2])
}

export const extension = (buffer: Buffer) => {
return !!(buffer[0] & POS[3])
export const extension = (bytes: Uint8Array): boolean => {
return !!(bytes[0] & POS[3])
}

export const cSrcCount = (buffer: Buffer) => {
return buffer[0] & 0x0f
export const cSrcCount = (bytes: Uint8Array): number => {
return bytes[0] & 0x0f
}

export const marker = (buffer: Buffer) => {
return !!(buffer[1] & POS[0])
export const marker = (bytes: Uint8Array): boolean => {
return !!(bytes[1] & POS[0])
}

export const payloadType = (buffer: Buffer) => {
return buffer[1] & 0x7f
export const payloadType = (bytes: Uint8Array): number => {
return bytes[1] & 0x7f
}

export const sequenceNumber = (buffer: Buffer) => {
return buffer.readUInt16BE(2)
export const sequenceNumber = (bytes: Uint8Array): number => {
return readUInt16BE(bytes, 2)
}

export const timestamp = (buffer: Buffer) => {
return buffer.readUInt32BE(4)
export const timestamp = (bytes: Uint8Array): number => {
return readUInt32BE(bytes, 4)
}

export const sSrc = (buffer: Buffer) => {
return buffer.readUInt32BE(8)
export const sSrc = (bytes: Uint8Array): number => {
return readUInt32BE(bytes, 8)
}

export const cSrc = (buffer: Buffer, rank = 0) => {
return cSrcCount(buffer) > rank ? buffer.readUInt32BE(12 + rank * 4) : 0
export const cSrc = (bytes: Uint8Array, rank = 0): number => {
return cSrcCount(bytes) > rank ? readUInt32BE(bytes, 12 + rank * 4) : 0
}

export const extHeaderLength = (buffer: Buffer) => {
return !extension(buffer)
export const extHeaderLength = (bytes: Uint8Array): number => {
return !extension(bytes)
? 0
: buffer.readUInt16BE(12 + cSrcCount(buffer) * 4 + 2)
: readUInt16BE(bytes, 12 + cSrcCount(bytes) * 4 + 2)
}

export const extHeader = (buffer: Buffer) => {
return extHeaderLength(buffer) === 0
? Buffer.from([])
: buffer.slice(
12 + cSrcCount(buffer) * 4,
12 + cSrcCount(buffer) * 4 + 4 + extHeaderLength(buffer) * 4
export const extHeader = (bytes: Uint8Array): Uint8Array => {
return extHeaderLength(bytes) === 0
? new Uint8Array(0)
: bytes.subarray(
12 + cSrcCount(bytes) * 4,
12 + cSrcCount(bytes) * 4 + 4 + extHeaderLength(bytes) * 4
)
}

export const payload = (buffer: Buffer) => {
return !extension(buffer)
? buffer.slice(12 + cSrcCount(buffer) * 4)
: buffer.slice(12 + cSrcCount(buffer) * 4 + 4 + extHeaderLength(buffer) * 4)
export const payload = (bytes: Uint8Array): Uint8Array => {
return !extension(bytes)
? bytes.subarray(12 + cSrcCount(bytes) * 4)
: bytes.subarray(12 + cSrcCount(bytes) * 4 + 4 + extHeaderLength(bytes) * 4)
}
20 changes: 10 additions & 10 deletions streams/tests/protocols-rtp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,21 @@ describe('Rtp parsing', (test) => {
})

test('should expose the payload', () => {
assert.equal(payload(rtpBuffers[0]), Buffer.from([]))
assert.equal(payload(rtpBuffers[1]), Buffer.from([1, 2, 3]))
assert.equal(payload(rtpBuffers[2]), Buffer.from([1, 2, 3]))
assert.equal(payload(rtpBuffersWithHeaderExt[0]), Buffer.from([1, 2, 3]))
assert.equal(payload(rtpBuffersWithHeaderExt[1]), Buffer.from([1, 2, 3]))
assert.equal(payload(rtpBuffers[0]), new Uint8Array([]))
assert.equal(payload(rtpBuffers[1]), new Uint8Array([1, 2, 3]))
assert.equal(payload(rtpBuffers[2]), new Uint8Array([1, 2, 3]))
assert.equal(payload(rtpBuffersWithHeaderExt[0]), new Uint8Array([1, 2, 3]))
assert.equal(payload(rtpBuffersWithHeaderExt[1]), new Uint8Array([1, 2, 3]))
})

test('should expose the extension header', () => {
assert.equal(extHeader(rtpBuffers[0]), Buffer.from([]))
assert.equal(extHeader(rtpBuffers[1]), Buffer.from([]))
assert.equal(extHeader(rtpBuffers[2]), Buffer.from([]))
assert.equal(extHeader(rtpBuffersWithHeaderExt[0]), Buffer.from([]))
assert.equal(extHeader(rtpBuffers[0]), new Uint8Array([]))
assert.equal(extHeader(rtpBuffers[1]), new Uint8Array([]))
assert.equal(extHeader(rtpBuffers[2]), new Uint8Array([]))
assert.equal(extHeader(rtpBuffersWithHeaderExt[0]), new Uint8Array([]))
assert.equal(
extHeader(rtpBuffersWithHeaderExt[1]),
Buffer.from([1, 2, 0, 1, 1, 2, 3, 4])
new Uint8Array([1, 2, 0, 1, 1, 2, 3, 4])
)
})
})
2 changes: 1 addition & 1 deletion streams/tests/protocols-sdp.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as assert from 'uvu/assert'

import { MessageType } from 'components/message'
import { extractURIs, sdpFromBody, parse } from 'utils/protocols/sdp'
import { extractURIs, parse, sdpFromBody } from 'utils/protocols/sdp'

import { describe } from './uvu-describe'

Expand Down
10 changes: 5 additions & 5 deletions streams/tests/protocols.fixtures.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
/* biome-ignore format: custom formatting */
export const rtpBuffers = [
Buffer.from([128, 96, 80, 56, 225, 39, 20, 132, 25, 190, 186, 105]),
Buffer.from([128, 224, 80, 76, 225, 39, 108, 97, 25, 190, 186, 105, 1, 2, 3]),
Buffer.from([
new Uint8Array([128, 96, 80, 56, 225, 39, 20, 132, 25, 190, 186, 105]),
new Uint8Array([128, 224, 80, 76, 225, 39, 108, 97, 25, 190, 186, 105, 1, 2, 3]),
new Uint8Array([
129, 224, 80, 95, 225, 40, 57, 104, 25, 190, 186, 105, 0, 0, 0, 1, 1, 2, 3,
]),
]

/* biome-ignore format: custom formatting */
export const rtpBuffersWithHeaderExt = [
Buffer.from([
new Uint8Array([
144, 224, 80, 76, 225, 39, 108, 97, 25, 190, 186, 105, 1, 2, 0, 0, 1, 2, 3,
]),
Buffer.from([
new Uint8Array([
144, 224, 80, 76, 225, 39, 108, 97, 25, 190, 186, 105, 1, 2, 0, 1, 1, 2, 3,
4, 1, 2, 3,
]),
Expand Down
2 changes: 1 addition & 1 deletion streams/tests/rtsp-parser-parser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ describe('SDP data', (test) => {
assert.is(typeof sdp, 'object')
assert.is(typeof sdp.session, 'object')
assert.ok(Array.isArray(sdp.media))
assert.is(sdp.media[0].type, "video")
assert.is(sdp.media[0].type, 'video')
})

test('should handle segmented RTSP/SDP', () => {
Expand Down

0 comments on commit 9c1671f

Please sign in to comment.