Skip to content

Commit

Permalink
Visitor endpoints: don't forward media or bridge channel traffic (#1868)
Browse files Browse the repository at this point in the history
* Allow endpoints to be constructed in visitor mode.  Don't forward RTP/RTCP from visitors.
* Don't forward endpoint messages or endpoint stats from visitors.
* Don't track endpoint connection status for visitors.
* Add mock of visitor for EndpointConnectionStatusMonitorTest.
* Test that EndpointConnectionStatusMonitor does not report visitors.
* Add stats for total number of visitor endpoints.
* Add visitor flag to endpoint debug state.
* Only reject sources for a visitor if the list of media sources is non-empty.
  • Loading branch information
JonathanLennox authored Nov 3, 2022
1 parent 81ef77d commit a383706
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 16 deletions.
13 changes: 9 additions & 4 deletions jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -702,16 +702,21 @@ public AbstractEndpoint findSourceOwner(@NotNull String sourceName)
* @return an <tt>Endpoint</tt> participating in this <tt>Conference</tt>
*/
@NotNull
public Endpoint createLocalEndpoint(String id, boolean iceControlling, boolean sourceNames, boolean doSsrcRewriting)
public Endpoint createLocalEndpoint(
String id,
boolean iceControlling,
boolean sourceNames,
boolean doSsrcRewriting,
boolean visitor)
{
final AbstractEndpoint existingEndpoint = getEndpoint(id);
if (existingEndpoint != null)
{
throw new IllegalArgumentException("Local endpoint with ID = " + id + "already created");
}

final Endpoint endpoint = new Endpoint(id, this, logger, iceControlling, sourceNames, doSsrcRewriting);
videobridge.localEndpointCreated();
final Endpoint endpoint = new Endpoint(id, this, logger, iceControlling, sourceNames, doSsrcRewriting, visitor);
videobridge.localEndpointCreated(visitor);

subscribeToEndpointEvents(endpoint);

Expand Down Expand Up @@ -961,7 +966,7 @@ public void endpointExpired(AbstractEndpoint endpoint)
// The removed endpoint was a local Endpoint as opposed to a RelayedEndpoint.
updateEndpointsCache();
endpointsById.forEach((i, senderEndpoint) -> senderEndpoint.removeReceiver(id));
videobridge.localEndpointExpired();
videobridge.localEndpointExpired(((Endpoint) removedEndpoint).getVisitor());
}

relaysById.forEach((i, relay) -> relay.endpointExpired(id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ public BridgeChannelMessage lastN(LastNMessage message)
@Override
public BridgeChannelMessage endpointMessage(EndpointMessage message)
{
if (endpoint.getVisitor())
{
getLogger().warn("Not forwarding endpoint message from visitor endpoint");
return null;
}

// First insert/overwrite the "from" to prevent spoofing.
String from = endpoint.getId();
message.setFrom(from);
Expand Down Expand Up @@ -551,6 +557,12 @@ else if (targetEndpoint != null)
@Override
public BridgeChannelMessage endpointStats(@NotNull EndpointStats message)
{
if (endpoint.getVisitor())
{
getLogger().warn("Not forwarding endpoint stats from visitor endpoint");
return null;
}

// First insert/overwrite the "from" to prevent spoofing.
String from = endpoint.getId();
message.setFrom(from);
Expand Down
28 changes: 26 additions & 2 deletions jvb/src/main/java/org/jitsi/videobridge/Videobridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,23 @@ public JvbHealthChecker getJvbHealthChecker()
return conference;
}

void localEndpointCreated()
void localEndpointCreated(boolean visitor)
{
statistics.currentLocalEndpoints.inc();
if (visitor)
{
statistics.currentVisitors.inc();
}
}

void localEndpointExpired()
void localEndpointExpired(boolean visitor)
{
long remainingEndpoints = statistics.currentLocalEndpoints.decAndGet();
if (visitor)
{
statistics.currentVisitors.dec();
}

if (remainingEndpoints < 0)
{
logger.warn("Invalid endpoint count " + remainingEndpoints + ". Disabling endpoint-count based shutdown!");
Expand Down Expand Up @@ -968,6 +977,13 @@ public static class Statistics
"endpoints",
"The total number of endpoints created.");

/**
* The total number of visitor endpoints.
*/
public CounterMetric totalVisitors = VideobridgeMetricsContainer.getInstance().registerCounter(
"visitors",
"The total number of visitor endpoints created.");

/**
* The number of endpoints which had not established an endpoint
* message transport even after some delay.
Expand Down Expand Up @@ -1058,6 +1074,14 @@ public static class Statistics
"Number of local endpoints that exist currently."
);

/**
* Number of visitor endpoints that exist currently.
*/
public LongGaugeMetric currentVisitors = VideobridgeMetricsContainer.getInstance().registerLongGauge(
"current_visitors",
"Number of visitor endpoints."
);

/**
* Current number of conferences.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ private void generate0()
TOTAL_LOSS_DEGRADED_PARTICIPANT_SECONDS,
jvbStats.totalLossDegradedParticipantMs.get() / 1000);
unlockedSetStat(TOTAL_PARTICIPANTS, jvbStats.totalEndpoints.get());
unlockedSetStat("total_visitors", jvbStats.totalVisitors.get());
unlockedSetStat(
EPS_NO_MSG_TRANSPORT_AFTER_DELAY,
jvbStats.numEndpointsNoMessageTransportAfterDelay.get()
Expand Down Expand Up @@ -554,6 +555,7 @@ private void generate0()
unlockedSetStat(INACTIVE_CONFERENCES, inactiveConferences);
unlockedSetStat(P2P_CONFERENCES, p2pConferences);
unlockedSetStat("endpoints", endpoints);
unlockedSetStat("visitors", jvbStats.currentVisitors.get());
unlockedSetStat(PARTICIPANTS, endpoints);
unlockedSetStat("local_endpoints", localEndpoints);
unlockedSetStat(RECEIVE_ONLY_ENDPOINTS, receiveOnlyEndpoints);
Expand Down
14 changes: 14 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ class Endpoint @JvmOverloads constructor(
iceControlling: Boolean,
private val isUsingSourceNames: Boolean,
private val doSsrcRewriting: Boolean,
/**
* Whether this endpoint is in "visitor" mode, i.e. should be invisible to other endpoints.
*/
val visitor: Boolean,
private val clock: Clock = Clock.systemUTC()
) : AbstractEndpoint(conference, id, parentLogger),
PotentialPacketHandler,
Expand Down Expand Up @@ -317,6 +321,9 @@ class Endpoint @JvmOverloads constructor(
setupDtlsTransport()

conference.videobridge.statistics.totalEndpoints.inc()
if (visitor) {
conference.videobridge.statistics.totalVisitors.inc()
}

logger.info("Created new endpoint isUsingSourceNames=$isUsingSourceNames, iceControlling=$iceControlling")
}
Expand Down Expand Up @@ -796,6 +803,12 @@ class Endpoint @JvmOverloads constructor(
* transceiver's incoming pipeline.
*/
fun handleIncomingPacket(packetInfo: PacketInfo) {
if (visitor) {
/* Never forward RTP/RTCP from a visitor. */
ByteBufferPool.returnBuffer(packetInfo.packet.buffer)
return
}

packetInfo.endpointId = id
conference.handleIncomingPacket(packetInfo)
}
Expand Down Expand Up @@ -1072,6 +1085,7 @@ class Endpoint @JvmOverloads constructor(
put("transceiver", transceiver.getNodeStats().toJson())
put("acceptAudio", acceptAudio)
put("acceptVideo", acceptVideo)
put("visitor", visitor)
put("messageTransport", messageTransport.debugState)
if (doSsrcRewriting) {
put("audioSsrcs", audioSsrcs.getDebugState())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class EndpointConnectionStatusMonitor @JvmOverloads constructor(
}

private fun run() {
conference.localEndpoints.forEach(::monitorEndpointActivity)
conference.localEndpoints.filter { !it.visitor }.forEach(::monitorEndpointActivity)
}

private fun monitorEndpointActivity(endpoint: Endpoint) {
Expand Down Expand Up @@ -143,7 +143,7 @@ class EndpointConnectionStatusMonitor @JvmOverloads constructor(
*/
fun endpointConnected(endpointId: String) {
synchronized(inactiveEndpointIds) {
val localEndpointIds = conference.localEndpoints.map { it.id }
val localEndpointIds = conference.localEndpoints.filter { !it.visitor }.map { it.id }
inactiveEndpointIds.forEach { inactiveEpId ->
// inactiveEndpointIds may contain endpoints that have already expired and/or moved to another bridge.
if (localEndpointIds.contains(inactiveEpId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.jitsi.xmpp.extensions.jingle.SourceGroupPacketExtension
import org.jitsi.xmpp.util.createError
import org.jivesoftware.smack.packet.IQ
import org.jivesoftware.smack.packet.StanzaError.Condition
import org.jivesoftware.smackx.muc.MUCRole

class Colibri2ConferenceHandler(
private val conference: Conference,
Expand Down Expand Up @@ -159,7 +160,13 @@ class Colibri2ConferenceHandler(
)
val sourceNames = c2endpoint.hasCapability(Capability.CAP_SOURCE_NAME_SUPPORT)
val ssrcRewriting = sourceNames && c2endpoint.hasCapability(Capability.CAP_SSRC_REWRITING_SUPPORT)
conference.createLocalEndpoint(c2endpoint.id, transport.iceControlling, sourceNames, ssrcRewriting).apply {
conference.createLocalEndpoint(
c2endpoint.id,
transport.iceControlling,
sourceNames,
ssrcRewriting,
c2endpoint.mucRole == MUCRole.visitor
).apply {
c2endpoint.statsId?.let {
statsId = it
}
Expand Down Expand Up @@ -232,6 +239,13 @@ class Colibri2ConferenceHandler(
}

c2endpoint.sources?.let { sources ->
if (endpoint.visitor && sources.mediaSources.isNotEmpty()) {
throw IqProcessingException(
Condition.bad_request,
"Attempt to set sources for visitor endpoint ${c2endpoint.id}"
)
}

sources.mediaSources.forEach { mediaSource ->
mediaSource.sources.forEach {
endpoint.addReceiveSsrc(it.ssrc, mediaSource.type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConferenceTest : ConfigTest() {
context("Adding local endpoints should work") {
with(Conference(videobridge, "id", name, null, false)) {
endpointCount shouldBe 0
createLocalEndpoint("abcdabcd", true, false, false) // TODO cover the case when they're true
createLocalEndpoint("abcdabcd", true, false, false, false) // TODO cover the case when they're true
endpointCount shouldBe 1
debugState.shouldBeValidJson()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.kotest.inspectors.forAny
import io.kotest.matchers.collections.shouldBeEmpty
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
Expand All @@ -39,11 +40,17 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
val executor = FakeScheduledExecutorService()
val localEp1: Endpoint = mockk {
every { id } returns "1"
every { visitor } returns false
}
val localEp2: Endpoint = mockk {
every { id } returns "2"
every { visitor } returns false
}
val eps = listOf(localEp1, localEp2)
val localEp3: Endpoint = mockk {
every { id } returns "3"
every { visitor } returns true
}
val eps = listOf(localEp1, localEp2, localEp3)

val broadcastMessage = slot<EndpointConnectionStatusMessage>()
val broadcastSendToRelays = slot<Boolean>()
Expand Down Expand Up @@ -102,7 +109,7 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
}
clock.elapse(1.mins)
executor.runOne()
should("fire broadcast events for the local endpoints") {
should("fire broadcast events for the non-visitor local endpoints") {
sendMessageCalls.shouldBeEmpty()
broadcastCalls shouldHaveSize 2
broadcastCalls.forAny { (msg, sendToRelays) ->
Expand All @@ -116,6 +123,9 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
msg.endpoint shouldBe "2"
msg.active shouldBe "false"
}
broadcastCalls.forAll { (msg, sendToOcto) ->
msg.endpoint shouldNotBe "3"
}
}
context("and then become active") {
clock.elapse(30.secs)
Expand All @@ -137,6 +147,9 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
msg.endpoint shouldBe "2"
msg.active shouldBe "true"
}
broadcastCalls.forAll { (msg, sendToOcto) ->
msg.endpoint shouldNotBe "3"
}
}
}
}
Expand All @@ -157,7 +170,7 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
context("but not within maxInactivityLimit") {
clock.elapse(1.mins)
executor.runOne()
should("fire inactive events") {
should("fire inactive events for non-visitor endpoints") {
sendMessageCalls.shouldBeEmpty()
broadcastCalls shouldHaveSize 2
broadcastCalls.forAny { (msg, sendToRelays) ->
Expand All @@ -170,6 +183,9 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
msg.endpoint shouldBe "2"
msg.active shouldBe "false"
}
broadcastCalls.forAll { (msg, sendToOcto) ->
msg.endpoint shouldNotBe "3"
}
}
context("but then one becomes active") {
every { localEp1.lastIncomingActivity } returns clock.instant()
Expand All @@ -188,7 +204,7 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
context("and then a new ep joins") {
every { conference.getLocalEndpoint("4") } returns mockk() { every { id } returns "4" }
monitor.endpointConnected("4")
should("update the new endpoint of the other endpoints' statuses") {
should("update the new endpoint of the other non-visitor endpoints' statuses") {
sendMessageCalls shouldHaveSize 2
sendMessageCalls.forAll { (_, destEps, sendToRelays) ->
destEps shouldHaveSize 1
Expand All @@ -203,6 +219,9 @@ class EndpointConnectionStatusMonitorTest : ShouldSpec({
msg.endpoint shouldBe "2"
msg.active shouldBe "false"
}
sendMessageCalls.forAll { (msg, _, _) ->
msg.endpoint shouldNotBe "3"
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions jvb/src/test/kotlin/org/jitsi/videobridge/VideobridgeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class VideobridgeTest : ShouldSpec() {
context("Shutdown") {
context("when a conference is active") {
withNewConfig("videobridge.shutdown.graceful-shutdown-min-participants=10") {
repeat(15) { videobridge.localEndpointCreated() }
repeat(15) { videobridge.localEndpointCreated(false) }
context("starting a graceful shutdown") {
videobridge.shutdown(true)
should("report that shutdown is in progress") {
Expand All @@ -66,7 +66,7 @@ class VideobridgeTest : ShouldSpec() {
verify(exactly = 0) { shutdownService.beginShutdown() }
}
context("When the number of participants drops below the threshold") {
repeat(10) { videobridge.localEndpointExpired() }
repeat(10) { videobridge.localEndpointExpired(false) }
videobridge.shutdownState shouldBe ShutdownState.SHUTTING_DOWN
fakeExecutor.clock.elapse(ShutdownConfig.config.shuttingDownDelay)
fakeExecutor.run()
Expand Down

0 comments on commit a383706

Please sign in to comment.