Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Use usrsctp only from dedicated single thread. #1261

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 108 additions & 74 deletions src/main/java/org/jitsi/videobridge/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -873,97 +873,85 @@ public void createSctpConnection()
sctpHandler.setSctpManager(sctpManager);
// NOTE(brian): as far as I know we always act as the 'server' for sctp
// connections, but if not we can make which type we use dynamic
SctpServerSocket socket = sctpManager.createServerSocket();
socket.eventHandler = new SctpSocket.SctpSocketEventHandler()
{
@Override
public void onReady()
sctpManager.createServerSocket().thenAcceptAsync(socket -> {
socket.eventHandler = new SctpSocket.SctpSocketEventHandler()
{
logger.info("SCTP connection is ready, creating the Data channel stack");
dataChannelStack
= new DataChannelStack(
(data, sid, ppid) -> socket.send(data, true, sid, ppid),
@Override
public void onReady()
{
logger.info("SCTP connection is ready, creating the Data channel stack");
dataChannelStack
= new DataChannelStack(
(data, sid, ppid) ->
{
try
{
TaskPools.SCTP_POOL.submit(() ->
{
socket.send(data, true, sid, ppid);
});
return 0;
}
catch (RejectedExecutionException e)
{
return -1;
}
},
logger
);
dataChannelStack.onDataChannelStackEvents(dataChannel ->
{
logger.info("Remote side opened a data channel.");
Endpoint.this.messageTransport.setDataChannel(dataChannel);
});
dataChannelHandler.setDataChannelStack(dataChannelStack);
if (OPEN_DATA_LOCALLY)
{
logger.info("Will open the data channel.");
DataChannel dataChannel
= dataChannelStack.createDataChannel(
dataChannelStack.onDataChannelStackEvents(dataChannel ->
{
logger.info("Remote side opened a data channel.");
Endpoint.this.messageTransport.setDataChannel(dataChannel);
});
dataChannelHandler.setDataChannelStack(dataChannelStack);
if (OPEN_DATA_LOCALLY)
{
logger.info("Will open the data channel.");
DataChannel dataChannel
= dataChannelStack.createDataChannel(
DataChannelProtocolConstants.RELIABLE,
0,
0,
0,
"default");
Endpoint.this.messageTransport.setDataChannel(dataChannel);
dataChannel.open();
Endpoint.this.messageTransport.setDataChannel(dataChannel);
dataChannel.open();
}
else
{
logger.info("Will wait for the remote side to open the data channel.");
}
}
else

@Override
public void onDisconnected()
{
logger.info("Will wait for the remote side to open the data channel.");
logger.info("SCTP connection is disconnected.");
}
}

@Override
public void onDisconnected()
{
logger.info("SCTP connection is disconnected.");
}
};
socket.dataCallback = (data, sid, ssn, tsn, ppid, context, flags) -> {
// We assume all data coming over SCTP will be datachannel data
DataChannelPacket dcp
};
socket.dataCallback = (data, sid, ssn, tsn, ppid, context, flags) -> {
// We assume all data coming over SCTP will be datachannel data
DataChannelPacket dcp
= new DataChannelPacket(data, 0, data.length, sid, (int)ppid);
// Post the rest of the task here because the current context is
// holding a lock inside the SctpSocket which can cause a deadlock
// if two endpoints are trying to send datachannel messages to one
// another (with stats broadcasting it can happen often)
TaskPools.IO_POOL.execute(
// Post the rest of the task here because the current context is
// holding a lock inside the SctpSocket which can cause a deadlock
// if two endpoints are trying to send datachannel messages to one
// another (with stats broadcasting it can happen often)
TaskPools.IO_POOL.execute(
() -> dataChannelHandler.consume(new PacketInfo(dcp)));
};
socket.listen();
sctpSocket = Optional.of(socket);
};
socket.listen();
sctpSocket = Optional.of(socket);
}, TaskPools.SCTP_POOL);
}

private void acceptSctpConnection(SctpServerSocket sctpServerSocket)
{
TaskPools.IO_POOL.submit(() -> {
// We don't want to block the thread calling
// onDtlsHandshakeComplete so run the socket acceptance in an IO
// pool thread
// FIXME: This runs forever once the socket is closed (
// accept never returns true).
logger.info("Attempting to establish SCTP socket connection");
int attempts = 0;
while (!sctpServerSocket.accept())
{
attempts++;
try
{
Thread.sleep(100);
}
catch (InterruptedException e)
{
break;
}

if (attempts > 100)
{
break;
}
}
if (logger.isDebugEnabled())
{
logger.debug("SCTP socket " + sctpServerSocket.hashCode() +
" accepted connection.");
}
});
// don't want to block calle thread, so schedule acceptor task onto
// dedicated pool
TaskPools.SCTP_POOL.submit(
new SctpSocketAcceptor(sctpServerSocket, 100, logger));
}

/**
Expand Down Expand Up @@ -1612,4 +1600,50 @@ public boolean isSendingVideo()
// video.
return transceiver.isReceivingVideo();
}

/**
* Runnable which periodically does non-blocking check for accepted
* sctp socket.
* This must be executed on SCTP pool thread.
*/
private static final class SctpSocketAcceptor implements Runnable
{
private static final int ATTEMPT_DELAY_MILLIS = 100;
private final SctpServerSocket sctpServerSocket;
private final int maxAttempts;
private final Logger logger;

private int attempts = 0;

public SctpSocketAcceptor(
SctpServerSocket sctpServerSocket,
int maxAttempts,
Logger logger)
{
this.sctpServerSocket = sctpServerSocket;
this.maxAttempts = maxAttempts;
this.logger = logger;
}

@Override
public void run()
{
if (attempts > maxAttempts)
{
logger.warn("Max attempts reached to accept socket " + sctpServerSocket.hashCode());
return;
}
logger.info("Attempting to establish SCTP socket connection: #" + attempts);
attempts++;
if (!sctpServerSocket.accept())
{
TaskPools.SCTP_POOL.schedule(this, ATTEMPT_DELAY_MILLIS, TimeUnit.MILLISECONDS);
}
else
{
logger.debug(() -> "SCTP socket " + sctpServerSocket.hashCode() +
" accepted connection.");
}
}
}
}
92 changes: 59 additions & 33 deletions src/main/java/org/jitsi/videobridge/sctp/SctpManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.jitsi.videobridge.util.*;
import org.jitsi_modified.sctp4j.*;

import java.util.concurrent.*;

/**
* Manages the SCTP connection and handles incoming and outgoing SCTP packets.
*
Expand Down Expand Up @@ -51,7 +53,10 @@ public class SctpManager {
private static int DEFAULT_SCTP_PORT = 5000;
static
{
Sctp4j.init(DEFAULT_SCTP_PORT);
TaskPools.SCTP_POOL.submit(() ->
{
Sctp4j.init(DEFAULT_SCTP_PORT);
});
}

/**
Expand All @@ -71,60 +76,75 @@ public SctpManager(SctpDataSender dataSender, Logger parentLogger)
* application packet
*/
public void handleIncomingSctp(PacketInfo sctpPacket) {
logger.debug(() -> "SCTP Socket " + socket.hashCode() + " receiving incoming SCTP data");
//NOTE(brian): from what I can tell in usrsctp, we can assume that it will make a copy
// of the buffer we pass it here (this ends up hitting usrsctp_conninput, and the sample
// program for usrsctp re-uses the buffer that's passed here, and the code does appear
// to make a copy).
socket.onConnIn(sctpPacket.getPacket().getBuffer(), sctpPacket.getPacket().getOffset(), sctpPacket.getPacket().getLength());
ByteBufferPool.returnBuffer(sctpPacket.getPacket().getBuffer());
TaskPools.SCTP_POOL.submit(() ->
{
logger.debug(() -> "SCTP Socket " + socket.hashCode() + " receiving incoming SCTP data");
//NOTE(brian): from what I can tell in usrsctp, we can assume that it will make a copy
// of the buffer we pass it here (this ends up hitting usrsctp_conninput, and the sample
// program for usrsctp re-uses the buffer that's passed here, and the code does appear
// to make a copy).
socket.onConnIn(sctpPacket.getPacket().getBuffer(), sctpPacket.getPacket().getOffset(), sctpPacket.getPacket().getLength());
ByteBufferPool.returnBuffer(sctpPacket.getPacket().getBuffer());
});
}

/**
* Create an {@link SctpServerSocket} to be used to wait for incoming SCTP connections
* @return an {@link SctpServerSocket}
*/
public SctpServerSocket createServerSocket()
public CompletableFuture<SctpServerSocket> createServerSocket()
{
socket = Sctp4j.createServerSocket(DEFAULT_SCTP_PORT);
socket.outgoingDataSender = this.dataSender;
logger.debug(() -> "Created SCTP server socket " + socket.hashCode());
return (SctpServerSocket)socket;
return CompletableFuture.supplyAsync(() ->
{
socket = Sctp4j.createServerSocket(DEFAULT_SCTP_PORT);
socket.outgoingDataSender = this.dataSender;
logger.debug(() -> "Created SCTP server socket " + socket.hashCode());
return (SctpServerSocket)socket;
}, TaskPools.SCTP_POOL);
}

/**
* Create an {@link SctpClientSocket} to be used to open an SCTP connection
* @return an {@link SctpClientSocket}
*/
public SctpClientSocket createClientSocket() {
socket = Sctp4j.createClientSocket(DEFAULT_SCTP_PORT);
socket.outgoingDataSender = this.dataSender;
if (logger.isDebugEnabled())
public CompletableFuture<SctpClientSocket> createClientSocket()
{
return CompletableFuture.supplyAsync(() ->
{
logger.debug("Created SCTP client socket " + socket.hashCode());
}
return (SctpClientSocket)socket;
socket = Sctp4j.createClientSocket(DEFAULT_SCTP_PORT);
socket.outgoingDataSender = this.dataSender;
if (logger.isDebugEnabled())
{
logger.debug("Created SCTP client socket " + socket.hashCode());
}
return (SctpClientSocket)socket;
}, TaskPools.SCTP_POOL);
}

/**
* Close the active {@link SctpSocket}, if there is one
*/
public void closeConnection() {
if (socket != null) {
if (logger.isDebugEnabled())
public CompletableFuture<Void> closeConnection()
{
return CompletableFuture.runAsync(() ->
{
if (socket != null)
{
logger.debug("Closing SCTP socket " + socket.hashCode());
if (logger.isDebugEnabled())
{
logger.debug("Closing SCTP socket " + socket.hashCode());
}
socket.close();
socket = null;
}
socket.close();
socket = null;
}
else
{
if (logger.isDebugEnabled())
else
{
logger.debug("No SCTP socket to close");
if (logger.isDebugEnabled())
{
logger.debug("No SCTP socket to close");
}
}
}
}, TaskPools.SCTP_POOL);
}

/**
Expand All @@ -144,7 +164,13 @@ public int send(byte[] data, int offset, int length)
{
byte[] newBuf = ByteBufferPool.getBuffer(length);
System.arraycopy(data, offset, newBuf, 0, length);
return innerSctpDataSender.send(newBuf, 0, length);

// Leave SCTP thread ASAP
TaskPools.IO_POOL.submit(() ->
{
innerSctpDataSender.send(newBuf, 0, length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delegating to just any IO thread breaks outgoing SCTP packet order when the machine is under load.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying transport (udp) doesn't guarantee order either

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Thanks for pointing out that we're not dealing with user message here, but rather with the underlying protocol. I missed that.

});
return 0;
}
}
}
21 changes: 21 additions & 0 deletions src/main/java/org/jitsi/videobridge/util/TaskPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@
public class TaskPools
{
private static final Logger classLogger = new LoggerImpl(TaskPools.class.getName());

/**
* A global pool of single thread to execute code which makes use of `usrsctp` library.
* This dramatically reduces chances of concurrent bugs inside `usrsctp`:
* crashes, deadlocks, state corruptions, infinite loops, etc.
* TODO: Once newer `usrsctp` version used `SCTP timer` thread from `usrsctp` must be
* TODO: replaced with recurring task to this pool which will advance timers inside `usrsctp`.
*/
public static final ScheduledExecutorService SCTP_POOL = Executors.unconfigurableScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new NameableThreadFactory("Global SCTP single-threaded pool"),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
classLogger.error("Runnable " + r + " rejected to be executed on " + executor);
}
}));

/**
* A global executor service which can be used for non-CPU-intensive tasks.
*/
Expand Down