Skip to content

Commit

Permalink
Improve JVB loss statistics (#1911)
Browse files Browse the repository at this point in the history
* Calculate outgoing packet loss as a side-effect of TransportCcEngine.

Factor out LossTracker from EndpointConnectionStats.

* Feed incomingLossListener from TccGeneratorNode.
  • Loading branch information
JonathanLennox authored Jun 14, 2022
1 parent 3c9d3c5 commit 8cded16
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.jitsi.nlj

import org.jitsi.nlj.rtp.LossListener
import org.jitsi.nlj.srtp.SrtpTransformers
import org.jitsi.nlj.stats.EndpointConnectionStats
import org.jitsi.nlj.stats.RtpReceiverStats
Expand Down Expand Up @@ -50,6 +51,8 @@ abstract class RtpReceiver :
abstract fun isReceivingAudio(): Boolean
abstract fun isReceivingVideo(): Boolean

abstract fun addLossListener(lossListener: LossListener)

abstract fun setFeature(feature: Features, enabled: Boolean)
abstract fun isFeatureEnabled(feature: Features): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.jitsi.nlj.rtcp.RembHandler
import org.jitsi.nlj.rtcp.RtcpEventNotifier
import org.jitsi.nlj.rtcp.RtcpRrGenerator
import org.jitsi.nlj.rtp.AudioRtpPacket
import org.jitsi.nlj.rtp.LossListener
import org.jitsi.nlj.rtp.VideoRtpPacket
import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator
import org.jitsi.nlj.srtp.SrtpTransformers
Expand Down Expand Up @@ -146,6 +147,9 @@ class RtpReceiverImpl @JvmOverloads constructor(

override fun isReceivingAudio() = audioBitrateCalculator.active
override fun isReceivingVideo() = videoBitrateCalculator.active
override fun addLossListener(lossListener: LossListener) {
tccGenerator.addLossListener(lossListener)
}

companion object {
val queueErrorCounter = CountingErrorHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.jitsi.nlj

import org.jitsi.nlj.rtp.LossListener
import org.jitsi.nlj.rtp.TransportCcEngine
import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator
import org.jitsi.nlj.srtp.SrtpTransformers
Expand Down Expand Up @@ -42,6 +43,7 @@ abstract class RtpSender :
abstract fun getPacketStreamStats(): PacketStreamStats.Snapshot
abstract fun getTransportCcEngineStats(): TransportCcEngine.StatisticsSnapshot
abstract fun requestKeyframe(mediaSsrc: Long? = null)
abstract fun addLossListener(lossListener: LossListener)
abstract fun setFeature(feature: Features, enabled: Boolean)
abstract fun isFeatureEnabled(feature: Features): Boolean
abstract fun tearDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.jitsi.nlj.rtcp.KeyframeRequester
import org.jitsi.nlj.rtcp.NackHandler
import org.jitsi.nlj.rtcp.RtcpEventNotifier
import org.jitsi.nlj.rtcp.RtcpSrUpdater
import org.jitsi.nlj.rtp.LossListener
import org.jitsi.nlj.rtp.TransportCcEngine
import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator
import org.jitsi.nlj.rtp.bandwidthestimation.GoogleCcEstimator
Expand Down Expand Up @@ -234,6 +235,10 @@ class RtpSenderImpl(
keyframeRequester.requestKeyframe(mediaSsrc)
}

override fun addLossListener(lossListener: LossListener) {
transportCcEngine.addLossListener(lossListener)
}

override fun setFeature(feature: Features, enabled: Boolean) {
when (feature) {
Features.TRANSCEIVER_PCAP_DUMP -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class Transceiver(
}
)

rtpReceiver.addLossListener(endpointConnectionStats.incomingLossTracker)
rtpSender.addLossListener(endpointConnectionStats.outgoingLossTracker)

rtcpEventNotifier.addRtcpEventListener(endpointConnectionStats)

endpointConnectionStats.addListener(rtpSender)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright @ 2019 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jitsi.nlj.rtp

import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.secs
import org.jitsi.utils.stats.RateTracker

class LossTracker : LossListener {
private val lostPackets = RateTracker(60.secs, 1.secs)
private val receivedPackets = RateTracker(60.secs, 1.secs)

@Synchronized
override fun packetReceived(previouslyReportedLost: Boolean) {
receivedPackets.update(1)
if (previouslyReportedLost) {
lostPackets.update(-1)
}
}

@Synchronized
override fun packetLost(numLost: Int) {
lostPackets.update(numLost.toLong())
}

@Synchronized
fun getSnapshot(): Snapshot {
return Snapshot(
lostPackets.getAccumulatedCount(),
receivedPackets.getAccumulatedCount()
)
}

data class Snapshot(
val packetsLost: Long,
val packetsReceived: Long
) {
fun toJson() = OrderedJsonObject().apply {
put("packets_lost", packetsLost)
put("packets_received", packetsReceived)
}
}
}

/**
* An interface to report when a packet is received, or is observed to be lost.
*/
/* TODO? This kind of overlaps with BandwidthEstimator? But it can be used in cases where we
* don't have all the information the BandwidthEstimator API needs. */
interface LossListener {
fun packetReceived(previouslyReportedLost: Boolean)
fun packetLost(numLost: Int)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.json.simple.JSONObject
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.LongAdder

/**
Expand Down Expand Up @@ -89,6 +90,8 @@ class TransportCcEngine(

private var lastRtt: Duration? = null

private val lossListeners = LinkedList<LossListener>()

/**
* Called when an RTP sender has a new round-trip time estimate.
*/
Expand All @@ -104,6 +107,24 @@ class TransportCcEngine(
}
}

/**
* Adds a loss listener to be notified about packet arrival and loss reports.
* @param listener
*/
@Synchronized
fun addLossListener(listener: LossListener) {
lossListeners.add(listener)
}

/**
* Removes a loss listener.
* @param listener
*/
@Synchronized
fun removeLossListener(listener: LossListener) {
lossListeners.remove(listener)
}

private fun tccReceived(tccPacket: RtcpFbTccPacket) {
val now = clock.instant()
var currArrivalTimestamp = instantOfEpochMicro(tccPacket.GetBaseTimeUs())
Expand Down Expand Up @@ -132,6 +153,11 @@ class TransportCcEngine(
packetDetail.state = PacketDetailState.reportedLost
numPacketsReported.increment()
numPacketsReportedLost.increment()
synchronized(this) {
lossListeners.forEach {
it.packetLost(1)
}
}
}
}
is ReceivedPacketReport -> {
Expand All @@ -156,6 +182,11 @@ class TransportCcEngine(
tccSeqNum, packetDetail.packetLength,
previouslyReportedLost = previouslyReportedLost
)
synchronized(this) {
lossListeners.forEach {
it.packetReceived(previouslyReportedLost)
}
}
packetDetail.state = PacketDetailState.reportedReceived
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
package org.jitsi.nlj.stats

import org.jitsi.nlj.rtcp.RtcpListener
import org.jitsi.nlj.rtp.LossTracker
import org.jitsi.nlj.util.toDoubleMillis
import org.jitsi.rtp.rtcp.RtcpPacket
import org.jitsi.rtp.rtcp.RtcpReportBlock
import org.jitsi.rtp.rtcp.RtcpRrPacket
import org.jitsi.rtp.rtcp.RtcpSrPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.UnreceivedPacketReport
import org.jitsi.utils.LRUCache
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.cdebug
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.utils.secs
import org.jitsi.utils.stats.RateTracker
import java.time.Clock
import java.time.Duration
import java.time.Instant
Expand All @@ -55,8 +53,8 @@ class EndpointConnectionStats(
}
data class Snapshot(
val rtt: Double,
val incomingLossStats: LossStatsSnapshot,
val outgoingLossStats: LossStatsSnapshot
val incomingLossStats: LossTracker.Snapshot,
val outgoingLossStats: LossTracker.Snapshot
) {
fun toJson() = OrderedJsonObject().apply {
put("rtt", rtt)
Expand All @@ -65,16 +63,6 @@ class EndpointConnectionStats(
}
}

data class LossStatsSnapshot(
val packetsLost: Long,
val packetsReceived: Long
) {
fun toJson() = OrderedJsonObject().apply {
put("packets_lost", packetsLost)
put("packets_received", packetsReceived)
}
}

private val endpointConnectionStatsListeners: MutableList<EndpointConnectionStatsListener> = CopyOnWriteArrayList()

// Per-SSRC, maps the compacted NTP timestamp found in an SR SenderInfo to
Expand All @@ -89,8 +77,8 @@ class EndpointConnectionStats(
*/
private var rtt: Double = 0.0

private val incomingLossTracker = LossTracker()
private val outgoingLossTracker = LossTracker()
val incomingLossTracker = LossTracker()
val outgoingLossTracker = LossTracker()

fun addListener(listener: EndpointConnectionStatsListener) {
endpointConnectionStatsListeners.add(listener)
Expand All @@ -104,14 +92,8 @@ class EndpointConnectionStats(
return synchronized(lock) {
Snapshot(
rtt = rtt,
incomingLossStats = LossStatsSnapshot(
packetsLost = incomingLossTracker.lostPackets.getAccumulatedCount(),
packetsReceived = incomingLossTracker.receivedPackets.getAccumulatedCount()
),
outgoingLossStats = LossStatsSnapshot(
packetsLost = outgoingLossTracker.lostPackets.getAccumulatedCount(),
packetsReceived = outgoingLossTracker.receivedPackets.getAccumulatedCount()
)
incomingLossStats = incomingLossTracker.getSnapshot(),
outgoingLossStats = outgoingLossTracker.getSnapshot()
)
}
}
Expand All @@ -126,8 +108,6 @@ class EndpointConnectionStats(
logger.cdebug { "Received RR packet with ${packet.reportBlocks.size} report blocks" }
packet.reportBlocks.forEach { reportBlock -> processReportBlock(receivedTime, reportBlock) }
}
// Received TCC feedback reports loss on packets we *sent*
is RtcpFbTccPacket -> processTcc(packet, outgoingLossTracker)
}
}

Expand All @@ -141,8 +121,6 @@ class EndpointConnectionStats(
val entry = SsrcAndTimestamp(packet.senderSsrc, packet.senderInfo.compactedNtpTimestamp)
srSentTimes[entry] = clock.instant()
}
// Sent TCC feedback reports loss on packets we *received*
is RtcpFbTccPacket -> processTcc(packet, incomingLossTracker)
}
}

Expand Down Expand Up @@ -188,22 +166,4 @@ class EndpointConnectionStats(
}
}
}

private fun processTcc(tccPacket: RtcpFbTccPacket, lossTracker: LossTracker) = synchronized(lock) {
var lost = 0L
var received = 0L
for (packetReport in tccPacket) {
when (packetReport) {
is UnreceivedPacketReport -> lost++
else -> received++
}
}
lossTracker.lostPackets.update(lost)
lossTracker.receivedPackets.update(received)
}

private class LossTracker {
val lostPackets = RateTracker(60.secs, 1.secs)
val receivedPackets = RateTracker(60.secs, 1.secs)
}
}
Loading

0 comments on commit 8cded16

Please sign in to comment.