Skip to content

Commit

Permalink
[Java] Remove ControlSessionDemuxer.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Dec 5, 2024
1 parent 83db8b3 commit 2f8e497
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public AgentBuilder addInstrumentation(final AgentBuilder agentBuilder, final Ma
ENABLED_EVENTS.removeAll(getArchiveEventCodes(configOptions.get(DISABLED_ARCHIVE_EVENT_CODES)));

AgentBuilder tempBuilder = agentBuilder;
tempBuilder = addArchiveControlSessionDemuxerInstrumentation(tempBuilder);
tempBuilder = addArchiveControlSessionAdapterInstrumentation(tempBuilder);

tempBuilder = addEventInstrumentation(
tempBuilder,
Expand Down Expand Up @@ -157,15 +157,15 @@ private static EnumSet<ArchiveEventCode> getArchiveEventCodes(final String enabl
ArchiveEventCode::valueOf);
}

private static AgentBuilder addArchiveControlSessionDemuxerInstrumentation(final AgentBuilder agentBuilder)
private static AgentBuilder addArchiveControlSessionAdapterInstrumentation(final AgentBuilder agentBuilder)
{
if (ArchiveEventLogger.CONTROL_REQUEST_EVENTS.stream().noneMatch(ENABLED_EVENTS::contains))
{
return agentBuilder;
}

return agentBuilder
.type(nameEndsWith("ControlSessionDemuxer"))
.type(nameEndsWith("ControlSessionAdapter"))
.transform(((builder, typeDescription, classLoader, module, protectionDomain) -> builder
.visit(to(ControlInterceptor.ControlRequest.class)
.on(named("onFragment")))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@

import java.io.File;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

import static io.aeron.agent.ArchiveEventCode.*;
Expand All @@ -54,7 +51,7 @@
@ExtendWith(InterruptingTestCallback.class)
class ArchiveLoggingAgentTest
{
private static final Set<ArchiveEventCode> WAIT_LIST = synchronizedSet(EnumSet.noneOf(ArchiveEventCode.class));
private static final Set<ArchiveEventCode> WAIT_LIST = synchronizedSet(new HashSet<>());

private File testDir;

Expand All @@ -80,7 +77,7 @@ void logAll()

@Test
@InterruptAfter(10)
void logControlSessionDemuxerOnFragment()
void logControlSessionAdapterOnFragment()
{
testArchiveLogging(CMD_IN_KEEP_ALIVE.name() + "," + CMD_IN_AUTH_CONNECT.id(),
EnumSet.of(CMD_IN_AUTH_CONNECT, CMD_IN_KEEP_ALIVE));
Expand Down
21 changes: 14 additions & 7 deletions aeron-archive/src/main/java/io/aeron/archive/ArchiveConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

abstract class ArchiveConductor
extends SessionWorker<Session>
implements AvailableImageHandler, UnavailableCounterHandler
implements UnavailableImageHandler, UnavailableCounterHandler
{
private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(READ, WRITE);
static final String DELETE_SUFFIX = ".del";
Expand Down Expand Up @@ -114,6 +114,7 @@ abstract class ArchiveConductor
private final RecordingEventsProxy recordingEventsProxy;
private final Authenticator authenticator;
private final AuthorisationService authorisationService;
private final ControlSessionAdapter controlSessionAdapter;
private final ControlResponseProxy controlResponseProxy = new ControlResponseProxy();
private final ControlSessionProxy controlSessionProxy = new ControlSessionProxy(controlResponseProxy);
private final DutyCycleTracker dutyCycleTracker;
Expand Down Expand Up @@ -166,15 +167,18 @@ abstract class ArchiveConductor
final ChannelUri controlChannelUri = ChannelUri.parse(ctx.controlChannel());
controlChannelUri.put(CommonContext.SPARSE_PARAM_NAME, Boolean.toString(ctx.controlTermBufferSparse()));
controlSubscription = aeron.addSubscription(
controlChannelUri.toString(), ctx.controlStreamId(), this, null);
controlChannelUri.toString(), ctx.controlStreamId(), null, this);
}
else
{
controlSubscription = null;
}

localControlSubscription = aeron.addSubscription(
ctx.localControlChannel(), ctx.localControlStreamId(), this, null);
ctx.localControlChannel(), ctx.localControlStreamId(), null, this);

controlSessionAdapter = new ControlSessionAdapter(
decoders, controlSubscription, localControlSubscription, this, authorisationService);
}

public void onStart()
Expand All @@ -185,9 +189,9 @@ public void onStart()
dutyCycleTracker.update(nanoClock.nanoTime());
}

public void onAvailableImage(final Image image)
public void onUnavailableImage(final Image image)
{
addSession(new ControlSessionDemuxer(decoders, image, this, authorisationService));
controlSessionAdapter.abortControlSessionByImage(image);
}

public void onUnavailableCounter(
Expand Down Expand Up @@ -307,6 +311,9 @@ public int doWork()
markFile.updateActivityTimestamp(nowMs);
}
}

workCount += controlSessionAdapter.poll();

workCount += checkReplayTokens(nowNs);
workCount += invokeDriverConductor();
workCount += runTasks(taskQueue);
Expand Down Expand Up @@ -371,7 +378,7 @@ ControlSession newControlSession(
final int version,
final String channel,
final byte[] encodedCredentials,
final ControlSessionDemuxer demuxer)
final ControlSessionAdapter controlSessionAdapter)
{
final ChannelUri channelUri = ChannelUri.parse(channel);

Expand Down Expand Up @@ -414,7 +421,7 @@ ControlSession newControlSession(
sessionLivenessCheckIntervalMs,
aeron.asyncAddExclusivePublication(responseChannel, streamId),
invalidVersionMessage,
demuxer,
controlSessionAdapter,
aeron,
this,
cachedEpochClock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ enum State
private final ArrayDeque<BooleanSupplier> syncResponseQueue = new ArrayDeque<>(8);
private final ManyToOneConcurrentLinkedQueue<BooleanSupplier> asyncResponseQueue =
new ManyToOneConcurrentLinkedQueue<>();
private final ControlSessionDemuxer demuxer;
private final ControlSessionAdapter controlSessionAdapter;
private final String invalidVersionMessage;
private State state = State.INIT;
private boolean isInactive = false;
Expand All @@ -80,7 +80,7 @@ enum State
final long sessionLivenessCheckIntervalMs,
final long controlPublicationId,
final String invalidVersionMessage,
final ControlSessionDemuxer demuxer,
final ControlSessionAdapter controlSessionAdapter,
final Aeron aeron,
final ArchiveConductor conductor,
final CachedEpochClock cachedEpochClock,
Expand All @@ -93,7 +93,7 @@ enum State
this.connectTimeoutMs = connectTimeoutMs;
this.sessionLivenessCheckIntervalMs = sessionLivenessCheckIntervalMs;
this.invalidVersionMessage = invalidVersionMessage;
this.demuxer = demuxer;
this.controlSessionAdapter = controlSessionAdapter;
this.aeron = aeron;
this.controlPublicationId = controlPublicationId;
this.conductor = conductor;
Expand Down Expand Up @@ -145,7 +145,7 @@ public void close()
CloseHelper.close(conductor.context().countedErrorHandler(), controlPublication);
}

demuxer.removeControlSession(controlSessionId);
controlSessionAdapter.removeControlSession(controlSessionId);
if (!conductor.context().controlSessionsCounter().isClosed())
{
conductor.context().controlSessionsCounter().decrementOrdered();
Expand Down
Loading

0 comments on commit 2f8e497

Please sign in to comment.