Skip to content

Commit

Permalink
application-logs: Use the data-lake to log all webrtc stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaellehmkuhl authored and ArturoManzoli committed Jan 27, 2025
1 parent 74e0190 commit d7ec01f
Showing 1 changed file with 50 additions and 149 deletions.
199 changes: 50 additions & 149 deletions src/stores/omniscientLogger.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { WebRTCStats } from '@peermetrics/webrtc-stats'
import { useDocumentVisibility } from '@vueuse/core'
import { differenceInSeconds } from 'date-fns'
import { defineStore } from 'pinia'
import { ref, watch } from 'vue'

import { createDataLakeVariable, DataLakeVariable, setDataLakeVariableData } from '@/libs/actions/data-lake'
import { watch } from 'vue'

import {
createDataLakeVariable,
DataLakeVariable,
getDataLakeVariableInfo,
setDataLakeVariableData,
} from '@/libs/actions/data-lake'
import eventTracker from '@/libs/external-telemetry/event-tracking'
import { WebRTCStatsEvent, WebRTCVideoStat } from '@/types/video'

import { useVideoStore } from './video'

export const webrtcStats = new WebRTCStats({ getStatsInterval: 250 })

export const useOmniscientLoggerStore = defineStore('omniscient-logger', () => {
const videoStore = useVideoStore()

Expand All @@ -29,40 +31,6 @@ export const useOmniscientLoggerStore = defineStore('omniscient-logger', () => {
setDataLakeVariableData(cockpitMemoryUsageVariable.id, currentMemoryUsage)
}, 100)

// Routine to log the framerate of the video streams
const streamsFrameRateHistory = ref<{ [key in string]: number[] }>({})
let lastStreamAverageFramerateLog = new Date()
const streamAverageFramerateLogDelay = 10000
setInterval(() => {
Object.keys(videoStore.activeStreams).forEach((streamName) => {
if (videoStore.activeStreams[streamName] === undefined) return
videoStore.activeStreams[streamName]!.mediaStream?.getVideoTracks().forEach((track) => {
if (streamsFrameRateHistory.value[streamName] === undefined) streamsFrameRateHistory.value[streamName] = []
if (track.getSettings().frameRate === undefined) return

const streamHistory = streamsFrameRateHistory.value[streamName]
streamHistory.push(track.getSettings().frameRate as number)
streamHistory.splice(0, streamHistory.length - 10)
streamsFrameRateHistory.value[streamName] = streamHistory

const average = streamHistory.reduce((a, b) => a + b, 0) / streamHistory.length
const minThreshold = 0.9 * average
const newFrameRate = track.getSettings().frameRate as number

// Warn about drops in the framerate of the video stream
if (newFrameRate < minThreshold) {
console.warn(`Drop in the framerate detected for stream '${streamName}': ${newFrameRate.toFixed(2)} fps.`)
}

// Log the average framerate of the video stream recursively
if (new Date().getTime() - lastStreamAverageFramerateLog.getTime() > streamAverageFramerateLogDelay) {
console.debug(`Average frame rate for stream '${streamName}': ${average.toFixed(2)} fps.`)
lastStreamAverageFramerateLog = new Date()
}
})
})
}, 250)

// Routine to log the framerate of the application rendering
const appAverageFrameRateSampleDelay = 100
const cockpitAppFrameRateVariable = new DataLakeVariable(
Expand Down Expand Up @@ -93,24 +61,63 @@ export const useOmniscientLoggerStore = defineStore('omniscient-logger', () => {
fpsMeter()

// Routine to log the WebRTC statistics
const webrtcStreamStats: Record<string, ReturnType<typeof WebRTCStats>> = {}
const streamsAlreadyTrackingWebRTCStats: string[] = []

const streamRateVariableId = (streamName: string, statKeyName: string): string => {
return `stream-${streamName}-${statKeyName}`
}

// Monitor the active streams to add the connections to the WebRTC statistics
watch(videoStore.activeStreams, (streams) => {
Object.keys(streams).forEach((streamName) => {
const session = streams[streamName]?.webRtcManager.session
if (!session || !session.peerConnection) return
if (webrtcStats.peersToMonitor[session.consumerId]) return
webrtcStats.addConnection({

if (webrtcStreamStats[streamName] === undefined) {
webrtcStreamStats[streamName] = new WebRTCStats({ getStatsInterval: 100 })
}

if (webrtcStreamStats[streamName].peersToMonitor[session.consumerId]) return

webrtcStreamStats[streamName].addConnection({
pc: session.peerConnection, // RTCPeerConnection instance
peerId: session.consumerId, // any string that helps you identify this peer,
connectionId: session.id, // optional, an id that you can use to keep track of this connection
remote: false, // optional, override the global remote flag
})

storedKeys.forEach((key) => {
if (getDataLakeVariableInfo(streamRateVariableId(streamName, key)) === undefined) {
const streamVariable = new DataLakeVariable(
streamRateVariableId(streamName, key),
`Stream '${streamName}' - ${key}`,
'number',
`WebRTC stat '${key}' of the '${streamName}' video stream.`
)
createDataLakeVariable(streamVariable)
}
})

if (streamsAlreadyTrackingWebRTCStats.includes(streamName)) return
streamsAlreadyTrackingWebRTCStats.push(streamName)

webrtcStreamStats[streamName].on('stats', (ev: WebRTCStatsEvent) => {
try {
const videoData = ev.data.video.inbound[0]
if (videoData === undefined) return

storedKeys.forEach((key) => {
setDataLakeVariableData(streamRateVariableId(streamName, key), videoData[key])
})
} catch (error) {
console.error('Error while logging WebRTC statistics:', error)
}
})
})
})

// Track the WebRTC statistics, warn about changes in cumulative values and log the average values
const historyLength = 30 // Number of samples to keep in the history
const cumulativeKeys: WebRTCVideoStat[] = [
'bytesReceived',
'firCount',
Expand Down Expand Up @@ -147,116 +154,10 @@ export const useOmniscientLoggerStore = defineStore('omniscient-logger', () => {
'packetRate',
] // Keys that have average values
const storedKeys = [...cumulativeKeys, ...averageKeys] // Keys to store in the history
const cumulativeKeysThatShouldNotIncrease: WebRTCVideoStat[] = [
'firCount',
'framesDropped',
'freezeCount',
'nackCount',
'packetsLost',
'pauseCount',
'pliCount',
'totalFreezesDuration',
'totalPausesDuration',
] // Keys that should not increase
const averageKeysThatShouldNotDecrease: WebRTCVideoStat[] = [
'clockRate',
'framesAssembledFromMultiplePackets',
'framesPerSecond',
'packetRate',
] // Keys that should not decrease
const averageKeysThatShouldNotIncrease: WebRTCVideoStat[] = [
'jitter',
'jitterBufferDelay',
'jitterBufferMinimumDelay',
'jitterBufferTargetDelay',
] // Keys that should not increase

const webrtcStatsAverageLogDelay = 10000
let lastWebrtcStatsAverageLog = new Date()
const webRtcStatsHistory = ref<{ [id in string]: { [stat in string]: (number | string)[] } }>({})

webrtcStats.on('stats', (ev: WebRTCStatsEvent) => {
try {
const videoData = ev.data.video.inbound[0]
if (videoData === undefined) return

// Initialize the peer's statistics if they do not exist
if (webRtcStatsHistory.value[ev.peerId] === undefined) webRtcStatsHistory.value[ev.peerId] = {}

storedKeys.forEach((key) => {
// Initialize the key array if it does not exist
if (webRtcStatsHistory.value[ev.peerId][key] === undefined) webRtcStatsHistory.value[ev.peerId][key] = []

webRtcStatsHistory.value[ev.peerId][key].push(videoData[key])

// Keep only the last 'historyLength' samples
const keyArray = webRtcStatsHistory.value[ev.peerId][key]
keyArray.splice(0, keyArray.length - historyLength)
webRtcStatsHistory.value[ev.peerId][key] = keyArray
})

// Warn about changes in cumulative values that should not increase
cumulativeKeysThatShouldNotIncrease.forEach((key) => {
const keyArray = webRtcStatsHistory.value[ev.peerId][key]
if (keyArray.length < 2) return

const lastValue = keyArray[keyArray.length - 1]
const prevValue = keyArray[keyArray.length - 2]

if (typeof lastValue !== 'number' || typeof prevValue !== 'number') return

if (lastValue > prevValue) {
console.warn(`Cumulative value '${key}' increased for peer '${ev.peerId}': ${lastValue.toFixed(2)}.`)
}
})

// Warn about changes in average values that should not change
averageKeys.forEach((key) => {
const keyArray = webRtcStatsHistory.value[ev.peerId][key]
if (keyArray.length < historyLength) return

const average = (keyArray as number[]).reduce((a, b) => a + b, 0) / keyArray.length
const currentValue = keyArray[keyArray.length - 1] as number

if (averageKeysThatShouldNotDecrease.includes(key)) {
const minThreshold = 0.9 * average
if (currentValue < minThreshold) {
console.debug(`Drop in the value of key '${key}' for peer '${ev.peerId}': ${currentValue.toFixed(2)}.`)
}
}

if (averageKeysThatShouldNotIncrease.includes(key)) {
const minThreshold = 1.1 * average
if (currentValue > minThreshold) {
console.debug(`Increase in the value of key '${key}' for peer '${ev.peerId}': ${currentValue.toFixed(2)}.`)
}
}
})

// Log the average values recursively
if (new Date().getTime() - lastWebrtcStatsAverageLog.getTime() > webrtcStatsAverageLogDelay) {
averageKeys.forEach((key) => {
const keyArray = webRtcStatsHistory.value[ev.peerId][key]
if (keyArray.find((value) => typeof value !== 'number')) return
const average = (keyArray as number[]).reduce((a, b) => a + b, 0) / keyArray.length
console.debug(`Average value '${key}' for peer '${ev.peerId}': ${average.toFixed(4)}.`)
})
lastWebrtcStatsAverageLog = new Date()
}
} catch (error) {
console.error('Error while logging WebRTC statistics:', error)
}
})

// Routine to send a ping event to the event tracking system every 5 minutes
const initialTimestamp = new Date()
setInterval(() => {
eventTracker.capture('Ping', { runningTimeInSeconds: differenceInSeconds(new Date(), initialTimestamp) })
}, 1000 * 60 * 5)

return {
streamsFrameRateHistory,
appFrameRateHistory,
webRtcStatsHistory,
}
})

0 comments on commit d7ec01f

Please sign in to comment.