From 8cded16e5f465fc4b77fad890e713ed9c7d18cab Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Tue, 14 Jun 2022 11:00:17 -0400 Subject: [PATCH] Improve JVB loss statistics (#1911) * Calculate outgoing packet loss as a side-effect of TransportCcEngine. Factor out LossTracker from EndpointConnectionStats. * Feed incomingLossListener from TccGeneratorNode. --- .../main/kotlin/org/jitsi/nlj/RtpReceiver.kt | 3 + .../kotlin/org/jitsi/nlj/RtpReceiverImpl.kt | 4 ++ .../main/kotlin/org/jitsi/nlj/RtpSender.kt | 2 + .../kotlin/org/jitsi/nlj/RtpSenderImpl.kt | 5 ++ .../main/kotlin/org/jitsi/nlj/Transceiver.kt | 3 + .../kotlin/org/jitsi/nlj/rtp/LossListener.kt | 67 +++++++++++++++++++ .../org/jitsi/nlj/rtp/TransportCcEngine.kt | 31 +++++++++ .../nlj/stats/EndpointConnectionStats.kt | 54 ++------------- .../node/incoming/TccGeneratorNode.kt | 66 ++++++++++++++++-- .../jitsi/nlj/rtp/TransportCcEngineTest.kt | 31 ++++++++- .../node/incoming/TccGeneratorNodeTest.kt | 50 ++++++++++++++ .../callstats/ConferencePeriodicRunnable.java | 3 +- 12 files changed, 265 insertions(+), 54 deletions(-) create mode 100644 jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/LossListener.kt diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiver.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiver.kt index 6e301de069..156a863702 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiver.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiver.kt @@ -15,6 +15,7 @@ */ package org.jitsi.nlj +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.srtp.SrtpTransformers import org.jitsi.nlj.stats.EndpointConnectionStats import org.jitsi.nlj.stats.RtpReceiverStats @@ -50,6 +51,8 @@ abstract class RtpReceiver : abstract fun isReceivingAudio(): Boolean abstract fun isReceivingVideo(): Boolean + abstract fun addLossListener(lossListener: LossListener) + abstract fun setFeature(feature: Features, enabled: Boolean) abstract fun isFeatureEnabled(feature: Features): Boolean diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt index 1dc358125a..36d119570b 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpReceiverImpl.kt @@ -23,6 +23,7 @@ import org.jitsi.nlj.rtcp.RembHandler import org.jitsi.nlj.rtcp.RtcpEventNotifier import org.jitsi.nlj.rtcp.RtcpRrGenerator import org.jitsi.nlj.rtp.AudioRtpPacket +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.rtp.VideoRtpPacket import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator import org.jitsi.nlj.srtp.SrtpTransformers @@ -146,6 +147,9 @@ class RtpReceiverImpl @JvmOverloads constructor( override fun isReceivingAudio() = audioBitrateCalculator.active override fun isReceivingVideo() = videoBitrateCalculator.active + override fun addLossListener(lossListener: LossListener) { + tccGenerator.addLossListener(lossListener) + } companion object { val queueErrorCounter = CountingErrorHandler() diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt index ae6a7e9e14..639a4373ab 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSender.kt @@ -15,6 +15,7 @@ */ package org.jitsi.nlj +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.rtp.TransportCcEngine import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator import org.jitsi.nlj.srtp.SrtpTransformers @@ -42,6 +43,7 @@ abstract class RtpSender : abstract fun getPacketStreamStats(): PacketStreamStats.Snapshot abstract fun getTransportCcEngineStats(): TransportCcEngine.StatisticsSnapshot abstract fun requestKeyframe(mediaSsrc: Long? = null) + abstract fun addLossListener(lossListener: LossListener) abstract fun setFeature(feature: Features, enabled: Boolean) abstract fun isFeatureEnabled(feature: Features): Boolean abstract fun tearDown() diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt index d6d3ea456f..b90461f81f 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/RtpSenderImpl.kt @@ -22,6 +22,7 @@ import org.jitsi.nlj.rtcp.KeyframeRequester import org.jitsi.nlj.rtcp.NackHandler import org.jitsi.nlj.rtcp.RtcpEventNotifier import org.jitsi.nlj.rtcp.RtcpSrUpdater +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.rtp.TransportCcEngine import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator import org.jitsi.nlj.rtp.bandwidthestimation.GoogleCcEstimator @@ -234,6 +235,10 @@ class RtpSenderImpl( keyframeRequester.requestKeyframe(mediaSsrc) } + override fun addLossListener(lossListener: LossListener) { + transportCcEngine.addLossListener(lossListener) + } + override fun setFeature(feature: Features, enabled: Boolean) { when (feature) { Features.TRANSCEIVER_PCAP_DUMP -> { diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt index 56481d73d7..d806205ed4 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/Transceiver.kt @@ -146,6 +146,9 @@ class Transceiver( } ) + rtpReceiver.addLossListener(endpointConnectionStats.incomingLossTracker) + rtpSender.addLossListener(endpointConnectionStats.outgoingLossTracker) + rtcpEventNotifier.addRtcpEventListener(endpointConnectionStats) endpointConnectionStats.addListener(rtpSender) diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/LossListener.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/LossListener.kt new file mode 100644 index 0000000000..ecf3fb30b9 --- /dev/null +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/LossListener.kt @@ -0,0 +1,67 @@ +/* + * Copyright @ 2019 - present 8x8, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jitsi.nlj.rtp + +import org.jitsi.utils.OrderedJsonObject +import org.jitsi.utils.secs +import org.jitsi.utils.stats.RateTracker + +class LossTracker : LossListener { + private val lostPackets = RateTracker(60.secs, 1.secs) + private val receivedPackets = RateTracker(60.secs, 1.secs) + + @Synchronized + override fun packetReceived(previouslyReportedLost: Boolean) { + receivedPackets.update(1) + if (previouslyReportedLost) { + lostPackets.update(-1) + } + } + + @Synchronized + override fun packetLost(numLost: Int) { + lostPackets.update(numLost.toLong()) + } + + @Synchronized + fun getSnapshot(): Snapshot { + return Snapshot( + lostPackets.getAccumulatedCount(), + receivedPackets.getAccumulatedCount() + ) + } + + data class Snapshot( + val packetsLost: Long, + val packetsReceived: Long + ) { + fun toJson() = OrderedJsonObject().apply { + put("packets_lost", packetsLost) + put("packets_received", packetsReceived) + } + } +} + +/** + * An interface to report when a packet is received, or is observed to be lost. + */ +/* TODO? This kind of overlaps with BandwidthEstimator? But it can be used in cases where we +* don't have all the information the BandwidthEstimator API needs. */ +interface LossListener { + fun packetReceived(previouslyReportedLost: Boolean) + fun packetLost(numLost: Int) +} diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt index 631f4d4422..94a561b457 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt @@ -35,6 +35,7 @@ import org.json.simple.JSONObject import java.time.Clock import java.time.Duration import java.time.Instant +import java.util.* import java.util.concurrent.atomic.LongAdder /** @@ -89,6 +90,8 @@ class TransportCcEngine( private var lastRtt: Duration? = null + private val lossListeners = LinkedList() + /** * Called when an RTP sender has a new round-trip time estimate. */ @@ -104,6 +107,24 @@ class TransportCcEngine( } } + /** + * Adds a loss listener to be notified about packet arrival and loss reports. + * @param listener + */ + @Synchronized + fun addLossListener(listener: LossListener) { + lossListeners.add(listener) + } + + /** + * Removes a loss listener. + * @param listener + */ + @Synchronized + fun removeLossListener(listener: LossListener) { + lossListeners.remove(listener) + } + private fun tccReceived(tccPacket: RtcpFbTccPacket) { val now = clock.instant() var currArrivalTimestamp = instantOfEpochMicro(tccPacket.GetBaseTimeUs()) @@ -132,6 +153,11 @@ class TransportCcEngine( packetDetail.state = PacketDetailState.reportedLost numPacketsReported.increment() numPacketsReportedLost.increment() + synchronized(this) { + lossListeners.forEach { + it.packetLost(1) + } + } } } is ReceivedPacketReport -> { @@ -156,6 +182,11 @@ class TransportCcEngine( tccSeqNum, packetDetail.packetLength, previouslyReportedLost = previouslyReportedLost ) + synchronized(this) { + lossListeners.forEach { + it.packetReceived(previouslyReportedLost) + } + } packetDetail.state = PacketDetailState.reportedReceived } diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/stats/EndpointConnectionStats.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/stats/EndpointConnectionStats.kt index a9c80e3e86..fb5a974d65 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/stats/EndpointConnectionStats.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/stats/EndpointConnectionStats.kt @@ -17,20 +17,18 @@ package org.jitsi.nlj.stats import org.jitsi.nlj.rtcp.RtcpListener +import org.jitsi.nlj.rtp.LossTracker import org.jitsi.nlj.util.toDoubleMillis import org.jitsi.rtp.rtcp.RtcpPacket import org.jitsi.rtp.rtcp.RtcpReportBlock import org.jitsi.rtp.rtcp.RtcpRrPacket import org.jitsi.rtp.rtcp.RtcpSrPacket -import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket -import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.UnreceivedPacketReport import org.jitsi.utils.LRUCache import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.cdebug import org.jitsi.utils.logging2.createChildLogger import org.jitsi.utils.secs -import org.jitsi.utils.stats.RateTracker import java.time.Clock import java.time.Duration import java.time.Instant @@ -55,8 +53,8 @@ class EndpointConnectionStats( } data class Snapshot( val rtt: Double, - val incomingLossStats: LossStatsSnapshot, - val outgoingLossStats: LossStatsSnapshot + val incomingLossStats: LossTracker.Snapshot, + val outgoingLossStats: LossTracker.Snapshot ) { fun toJson() = OrderedJsonObject().apply { put("rtt", rtt) @@ -65,16 +63,6 @@ class EndpointConnectionStats( } } - data class LossStatsSnapshot( - val packetsLost: Long, - val packetsReceived: Long - ) { - fun toJson() = OrderedJsonObject().apply { - put("packets_lost", packetsLost) - put("packets_received", packetsReceived) - } - } - private val endpointConnectionStatsListeners: MutableList = CopyOnWriteArrayList() // Per-SSRC, maps the compacted NTP timestamp found in an SR SenderInfo to @@ -89,8 +77,8 @@ class EndpointConnectionStats( */ private var rtt: Double = 0.0 - private val incomingLossTracker = LossTracker() - private val outgoingLossTracker = LossTracker() + val incomingLossTracker = LossTracker() + val outgoingLossTracker = LossTracker() fun addListener(listener: EndpointConnectionStatsListener) { endpointConnectionStatsListeners.add(listener) @@ -104,14 +92,8 @@ class EndpointConnectionStats( return synchronized(lock) { Snapshot( rtt = rtt, - incomingLossStats = LossStatsSnapshot( - packetsLost = incomingLossTracker.lostPackets.getAccumulatedCount(), - packetsReceived = incomingLossTracker.receivedPackets.getAccumulatedCount() - ), - outgoingLossStats = LossStatsSnapshot( - packetsLost = outgoingLossTracker.lostPackets.getAccumulatedCount(), - packetsReceived = outgoingLossTracker.receivedPackets.getAccumulatedCount() - ) + incomingLossStats = incomingLossTracker.getSnapshot(), + outgoingLossStats = outgoingLossTracker.getSnapshot() ) } } @@ -126,8 +108,6 @@ class EndpointConnectionStats( logger.cdebug { "Received RR packet with ${packet.reportBlocks.size} report blocks" } packet.reportBlocks.forEach { reportBlock -> processReportBlock(receivedTime, reportBlock) } } - // Received TCC feedback reports loss on packets we *sent* - is RtcpFbTccPacket -> processTcc(packet, outgoingLossTracker) } } @@ -141,8 +121,6 @@ class EndpointConnectionStats( val entry = SsrcAndTimestamp(packet.senderSsrc, packet.senderInfo.compactedNtpTimestamp) srSentTimes[entry] = clock.instant() } - // Sent TCC feedback reports loss on packets we *received* - is RtcpFbTccPacket -> processTcc(packet, incomingLossTracker) } } @@ -188,22 +166,4 @@ class EndpointConnectionStats( } } } - - private fun processTcc(tccPacket: RtcpFbTccPacket, lossTracker: LossTracker) = synchronized(lock) { - var lost = 0L - var received = 0L - for (packetReport in tccPacket) { - when (packetReport) { - is UnreceivedPacketReport -> lost++ - else -> received++ - } - } - lossTracker.lostPackets.update(lost) - lossTracker.receivedPackets.update(received) - } - - private class LossTracker { - val lostPackets = RateTracker(60.secs, 1.secs) - val receivedPackets = RateTracker(60.secs, 1.secs) - } } diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt index 3715102331..76016fb228 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt @@ -16,6 +16,7 @@ package org.jitsi.nlj.transform.node.incoming import org.jitsi.nlj.PacketInfo +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.rtp.RtpExtensionType.TRANSPORT_CC import org.jitsi.nlj.stats.NodeStatsBlock import org.jitsi.nlj.transform.node.ObserverNode @@ -39,7 +40,7 @@ import org.jitsi.utils.secs import java.time.Clock import java.time.Duration import java.time.Instant -import java.util.TreeMap +import java.util.* /** * Extract the TCC sequence numbers from each passing packet and generate @@ -69,6 +70,8 @@ class TccGeneratorNode( } private val rfc3711IndexTracker = Rfc3711IndexTracker() + private val lossListeners = LinkedList() + init { streamInformation.onRtpExtensionMapping(TRANSPORT_CC) { tccExtensionId = it @@ -90,6 +93,27 @@ class TccGeneratorNode( } } + /** + * Adds a loss listener to be notified about packet arrival and loss reports. + * @param listener + */ + fun addLossListener(listener: LossListener) { + synchronized(lock) { + lossListeners.add(listener) + } + } + + /** + * Removes a loss listener. + * @param listener + */ + @Synchronized + fun removeLossListener(listener: LossListener) { + synchronized(lock) { + lossListeners.remove(listener) + } + } + /** * @param tccSeqNum the extended sequence number. */ @@ -101,11 +125,43 @@ class TccGeneratorNode( // TODO: Chrome does something more advanced, keeping older sequences to replay on packet reordering. packetArrivalTimes.clear() } - if (windowStartSeq == -1 || tccSeqNum < windowStartSeq) { - windowStartSeq = tccSeqNum - } - timestamp?.run { packetArrivalTimes.putIfAbsent(tccSeqNum, timestamp) } + timestamp?.run { + if (packetArrivalTimes.isEmpty() && windowStartSeq == -1) { + lossListeners.forEach { + it.packetReceived(false) + } + } else { + val oldMax = if (packetArrivalTimes.isNotEmpty()) { + packetArrivalTimes.lastKey() + } else { + windowStartSeq - 1 + } + if (tccSeqNum > oldMax) { + val numLost = tccSeqNum - oldMax - 1 + /* TODO: should we squelch for large tcc jumps? */ + lossListeners.forEach { + if (numLost > 0) { + it.packetLost(numLost) + } + it.packetReceived(false) + } + } else if (tccSeqNum < windowStartSeq || !packetArrivalTimes.containsKey(tccSeqNum)) { + /* If we've already cleared the arrival info about this packet, assume it was previously + * reported as lost - there are some corner cases where this isn't true, but they should be rare. + */ + lossListeners.forEach { + it.packetReceived(true) + } + } + } + + if (windowStartSeq == -1 || tccSeqNum < windowStartSeq) { + windowStartSeq = tccSeqNum + } + + packetArrivalTimes.putIfAbsent(tccSeqNum, timestamp) + } if (isTccReadyToSend(isMarked)) { buildFeedback(ssrc).forEach { sendTcc(it) } } diff --git a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt index bfb2a66641..5061da2c04 100644 --- a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt +++ b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt @@ -33,7 +33,25 @@ class TransportCcEngineTest : FunSpec() { private val clock: FakeClock = FakeClock() private val logger = StdoutLogger(_level = Level.INFO) - private val transportCcEngine = TransportCcEngine(bandwidthEstimator, logger, clock) + private val lossListener = object : LossListener { + var numReceived = 0 + var numLost = 0 + + override fun packetReceived(previouslyReportedLost: Boolean) { + numReceived++ + if (previouslyReportedLost) { + numLost-- + } + } + + override fun packetLost(numLost: Int) { + this.numLost += numLost + } + } + + private val transportCcEngine = TransportCcEngine(bandwidthEstimator, logger, clock).also { + it.addLossListener(lossListener) + } init { test("Missing packet reports") { @@ -56,6 +74,8 @@ class TransportCcEngineTest : FunSpec() { numPacketsReportedAfterLost shouldBe 0 numPacketsUnreported shouldBe 0 } + lossListener.numReceived shouldBe 1 + lossListener.numLost shouldBe 0 } test("Duplicate packet reports") { transportCcEngine.mediaPacketSent(4, 1300.bytes) @@ -80,6 +100,8 @@ class TransportCcEngineTest : FunSpec() { numPacketsReportedAfterLost shouldBe 0 numPacketsUnreported shouldBe 0 } + lossListener.numReceived shouldBe 1 + lossListener.numLost shouldBe 0 } test("Packets reported after lost") { transportCcEngine.mediaPacketSent(4, 1300.bytes) @@ -92,6 +114,9 @@ class TransportCcEngineTest : FunSpec() { } transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant()) + lossListener.numReceived shouldBe 1 + lossListener.numLost shouldBe 1 + val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) { SetBase(4, 130) AddReceivedPacket(4, 130) @@ -106,6 +131,8 @@ class TransportCcEngineTest : FunSpec() { numPacketsReportedAfterLost shouldBe 1 numPacketsUnreported shouldBe 0 } + lossListener.numReceived shouldBe 2 + lossListener.numLost shouldBe 0 } test("Packet unreported") { transportCcEngine.mediaPacketSent(4, 1300.bytes) @@ -118,6 +145,8 @@ class TransportCcEngineTest : FunSpec() { numPacketsReportedAfterLost shouldBe 0 numPacketsUnreported shouldBe 1 } + lossListener.numReceived shouldBe 0 + lossListener.numLost shouldBe 0 } } } diff --git a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNodeTest.kt b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNodeTest.kt index 3378c5bb3d..5a397a5372 100644 --- a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNodeTest.kt +++ b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNodeTest.kt @@ -12,6 +12,7 @@ import io.kotest.matchers.types.beInstanceOf import org.jitsi.nlj.PacketInfo import org.jitsi.nlj.format.Vp8PayloadType import org.jitsi.nlj.resources.logging.StdoutLogger +import org.jitsi.nlj.rtp.LossListener import org.jitsi.nlj.rtp.RtpExtension import org.jitsi.nlj.rtp.RtpExtensionType import org.jitsi.nlj.util.StreamInformationStoreImpl @@ -43,6 +44,22 @@ class TccGeneratorNodeTest : ShouldSpec() { private lateinit var tccGenerator: TccGeneratorNode + private var lossListener = object : LossListener { + var numReceived = 0 + var numLost = 0 + + override fun packetReceived(previouslyReportedLost: Boolean) { + numReceived++ + if (previouslyReportedLost) { + numLost-- + } + } + + override fun packetLost(numLost: Int) { + this.numLost += numLost + } + } + override suspend fun beforeSpec(spec: Spec) { super.beforeSpec(spec) streamInformationStore.addRtpExtensionMapping( @@ -50,6 +67,7 @@ class TccGeneratorNodeTest : ShouldSpec() { ) streamInformationStore.addRtpPayloadType(vp8PayloadType) tccGenerator = TccGeneratorNode(onTccReady, streamInformationStore, StdoutLogger(), clock) + tccGenerator.addLossListener(lossListener) } init { @@ -91,6 +109,12 @@ class TccGeneratorNodeTest : ShouldSpec() { } } } + context("loss statistics") { + should("be correct") { + lossListener.numReceived shouldBe 11 + lossListener.numLost shouldBe 0 + } + } } context("when a series of packets (where one is marked) is received") { with(clock) { @@ -127,6 +151,12 @@ class TccGeneratorNodeTest : ShouldSpec() { tccPackets.size shouldBe 2 } } + context("loss statistics") { + should("be correct") { + lossListener.numReceived shouldBe 4 + lossListener.numLost shouldBe 0 + } + } } context("when random packets are added") { val random = Random(1234) @@ -151,6 +181,12 @@ class TccGeneratorNodeTest : ShouldSpec() { } ) } + context("loss statistics") { + should("be correct") { + lossListener.numReceived shouldBe 7 + lossListener.numLost shouldBe 6 * 9999 + } + } for (i in 2..5000) { tccGenerator.processPacket( PacketInfo(createPacket(i % 0xffff)).apply { @@ -248,12 +284,26 @@ class TccGeneratorNodeTest : ShouldSpec() { } ) + context("loss statistics") { + should("be correct before reordered packet") { + lossListener.numReceived shouldBe 10 + lossListener.numLost shouldBe 1 + } + } + tccGenerator.processPacket( PacketInfo(createPacket(9)).apply { receivedTime = clock.instant() - Duration.ofMillis(10) } ) + context("loss statistics") { + should("be correct after reordered packet") { + lossListener.numReceived shouldBe 11 + lossListener.numLost shouldBe 0 + } + } + elapse(100.ms) tccGenerator.processPacket( PacketInfo(createPacket(20)).apply { diff --git a/jvb/src/main/java/org/jitsi/videobridge/stats/callstats/ConferencePeriodicRunnable.java b/jvb/src/main/java/org/jitsi/videobridge/stats/callstats/ConferencePeriodicRunnable.java index 722827f3a5..364354654a 100644 --- a/jvb/src/main/java/org/jitsi/videobridge/stats/callstats/ConferencePeriodicRunnable.java +++ b/jvb/src/main/java/org/jitsi/videobridge/stats/callstats/ConferencePeriodicRunnable.java @@ -16,6 +16,7 @@ package org.jitsi.videobridge.stats.callstats; import org.jetbrains.annotations.*; +import org.jitsi.nlj.rtp.*; import org.jitsi.nlj.stats.*; import org.jitsi.nlj.transform.node.incoming.*; import org.jitsi.nlj.transform.node.outgoing.*; @@ -121,7 +122,7 @@ protected List getEndpointStats() return allEndpointStats; } - private static double getFractionLost(EndpointConnectionStats.LossStatsSnapshot lossStatsSnapshot) + private static double getFractionLost(LossTracker.Snapshot lossStatsSnapshot) { if (lossStatsSnapshot.getPacketsLost() + lossStatsSnapshot.getPacketsReceived() > 0) {