Skip to content

Commit

Permalink
Refactor speech activity and add tests (#1348)
Browse files Browse the repository at this point in the history
* fix: Prevent NPE when DEBUG is not enabled, simplify code.

* ref: Use endpointId instead of SSRC to index dominant speaker.

* ref: Use only endpoint IDs in ConferenceSpeechActivity.

* ref: Use a generic listener instead of Conference.

* test: Add speech activity tests.

* cleanup: Move class definitions to the end of the file.

* ref: Provide the list of enpoint IDs as a param.

* ref: Merge LastNEndpoints in ConferenceSpeechActivity, add tests.

* ref: Make DominantSpeakerIdentification generic.

* squash: Avoid extra tasks, address comments.

* chore: Update jitsi-utils.
  • Loading branch information
bgrozev authored Jul 15, 2020
1 parent 4390f61 commit 072dd44
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 482 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<smack.version>4.2.4-47d17fc</smack.version>
<jicoco.version>1.1-36-gecd3e61</jicoco.version>
<jitsi.utils.version>1.0-54-ge0059ef</jitsi.utils.version>
<jitsi.utils.version>1.0-55-g21baf6e</jitsi.utils.version>
<maven-shade-plugin.version>3.2.2</maven-shade-plugin.version>
<spotbugs.version>4.0.1</spotbugs.version>
<jersey.version>2.30.1</jersey.version>
Expand Down
123 changes: 50 additions & 73 deletions src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ public class Conference
*/
private final ConferenceSpeechActivity speechActivity;

/**
* The audio level listener.
*/
private final AudioLevelListener audioLevelListener;

/**
* The <tt>Videobridge</tt> which has initialized this <tt>Conference</tt>.
*/
Expand Down Expand Up @@ -187,8 +182,6 @@ public class Conference
*/
private ConfOctoTransport tentacle;

private final LastNEndpoints lastNEndpoints = new LastNEndpoints();

/**
* The task of updating the ordered list of endpoints in the conference. It runs periodically in order to adapt to
* endpoints stopping or starting to their video streams (which affects the order).
Expand Down Expand Up @@ -237,11 +230,14 @@ public Conference(Videobridge videobridge,
this.includeInStatistics = enableLogging;
this.conferenceName = conferenceName;

speechActivity = new ConferenceSpeechActivity(this);
speechActivity = new ConferenceSpeechActivity(new SpeechActivityListener());
updateLastNEndpointsFuture = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(() -> {
try
{
lastNEndpoints.update();
if (speechActivity.updateLastNEndpoints())
{
lastNEndpointsChangedAsync();
}
}
catch (Exception e)
{
Expand All @@ -250,9 +246,6 @@ public Conference(Videobridge videobridge,

}, 3, 3, TimeUnit.SECONDS);

audioLevelListener
= (sourceSsrc, level) -> speechActivity.levelChanged(sourceSsrc, (int) level);

expireableImpl = new ExpireableImpl(logger, this::expire);

if (enableLogging)
Expand Down Expand Up @@ -425,37 +418,66 @@ public void describeShallow(ColibriConferenceIQ iq)
}
}

/**
* Runs {@link #lastNEndpointsChanged()} in an IO pool thread.
*/
private void lastNEndpointsChangedAsync()
{
TaskPools.IO_POOL.submit(() ->
{
try
{
lastNEndpointsChanged();
}
catch (Exception e)
{
logger.warn("Failed to handle change in last N endpoints: ", e);
}
});
}

/**
* Updates all endpoints with a new list of ordered endpoints in the conference.
*/
private void lastNEndpointsChanged()
{
List<String> lastNEndpointIds
= speechActivity.getOrderedEndpoints().stream()
.map(AbstractEndpoint::getID)
.collect(Collectors.toList());

endpointsCache.forEach(e -> e.lastNEndpointsChanged(lastNEndpointIds));
}

/**
* Notifies this instance that {@link #speechActivity} has identified a
* speaker switch event in this multipoint conference and there is now a new
* dominant speaker.
*/
void dominantSpeakerChanged()
private void dominantSpeakerChanged()
{
AbstractEndpoint dominantSpeaker = speechActivity.getDominantEndpoint();
String dominantSpeakerId = dominantSpeaker == null ? null : dominantSpeaker.getID();

if (logger.isInfoEnabled())
{
String id = dominantSpeaker == null ? "null" : dominantSpeaker.getID();
logger.info("ds_change ds_id=" + id);
getVideobridge().getStatistics().totalDominantSpeakerChanges.increment();
}

speechActivityEndpointsChanged();

if (dominantSpeaker != null)
{
broadcastMessage(new DominantSpeakerMessage(dominantSpeaker.getID()));
broadcastMessage(new DominantSpeakerMessage(dominantSpeakerId));
if (getEndpointCount() > 2)
{
double senderRtt = getRtt(dominantSpeaker);
double maxReceiveRtt = getMaxReceiverRtt(dominantSpeaker.getID());
double maxReceiveRtt = getMaxReceiverRtt(dominantSpeakerId);
// We add an additional 10ms delay to reduce the risk of the keyframe arriving
// too early
double keyframeDelay = maxReceiveRtt - senderRtt + 10;
if (logger.isDebugEnabled())
{
logger.debug("Scheduling keyframe request from " + dominantSpeaker.getID() + " after a delay" +
logger.debug("Scheduling keyframe request from " + dominantSpeakerId + " after a delay" +
" of " + keyframeDelay + "ms");
}
TaskPools.SCHEDULED_POOL.schedule(
Expand Down Expand Up @@ -692,7 +714,7 @@ else if (existingEndpoint != null)
*/
private void endpointsChanged()
{
speechActivity.endpointsChanged();
speechActivity.endpointsChanged(getEndpoints());
}

/**
Expand All @@ -704,7 +726,7 @@ private void endpointsChanged()
public void endpointSourcesChanged(AbstractEndpoint endpoint)
{
// Force an update to be propagated to each endpoint's bitrate controller.
lastNEndpoints.update(true);
lastNEndpointsChanged();
}

/**
Expand Down Expand Up @@ -936,14 +958,6 @@ public void endpointMessageTransportConnected(@NotNull AbstractEndpoint endpoint
}
}

/**
* Notifies this instance that the list of ordered endpoints has changed
*/
void speechActivityEndpointsChanged()
{
lastNEndpoints.update();
}

/**
* Gets the conference name.
*
Expand Down Expand Up @@ -1064,14 +1078,6 @@ private void sendOut(PacketInfo packetInfo)
}
}

/**
* Gets the audio level listener.
*/
public AudioLevelListener getAudioLevelListener()
{
return audioLevelListener;
}

/**
* @return The {@link ConfOctoTransport} for this conference.
*/
Expand Down Expand Up @@ -1315,47 +1321,18 @@ public Object put(String key, Object value)
}
}

private class LastNEndpoints
private class SpeechActivityListener implements ConferenceSpeechActivity.Listener
{
/**
* The list of endpoints ordered by speech activity and video activity (that is, endpoints with video enabled
* are first in the list).
*/
@NotNull
private List<String> lastNEndpointIds = new LinkedList<>();

/**
* Re-calculate the ordered list of endpoints in the conference.
*/
private void update()
@Override
public void dominantSpeakerChanged()
{
update(false);
Conference.this.dominantSpeakerChanged();
}

/**
* Re-calculate the ordered list of endpoints in the conference.
* @param force whether to fire an event even if the list doesn't change as a result of the call.
*/
private void update(boolean force)
@Override
public void lastNEndpointsChanged()
{
List<AbstractEndpoint> endpointsBySpeechActivity = speechActivity.getEndpoints();

Map<Boolean, List<AbstractEndpoint>> bySendingVideo
= endpointsBySpeechActivity.stream()
.collect(Collectors.groupingBy(AbstractEndpoint::isSendingVideo));

List<AbstractEndpoint> lastNEndpoints = new LinkedList<>();
lastNEndpoints.addAll(bySendingVideo.getOrDefault(true, Collections.emptyList()));
lastNEndpoints.addAll(bySendingVideo.getOrDefault(false, Collections.emptyList()));

List<String> lastNEndpointIds
= lastNEndpoints.stream().map(AbstractEndpoint::getID).collect(Collectors.toList());

if (force || !lastNEndpointIds.equals(this.lastNEndpointIds))
{
this.lastNEndpointIds = lastNEndpointIds;
endpointsCache.forEach(e -> e.speechActivityEndpointsChanged(this.lastNEndpointIds));
}
Conference.this.lastNEndpointsChanged();
}
}
}
Loading

0 comments on commit 072dd44

Please sign in to comment.