From fd70e0a696ba2d0216956c293e90e0e5c77ef9b1 Mon Sep 17 00:00:00 2001 From: Le Xu Date: Mon, 7 Feb 2022 18:10:52 -0600 Subject: [PATCH] wip optimization: flush and lazy synchronize on downstream for propagation; statefunstatefulcheckandinsert, statefunstatefullbdirect adaptation --- .../core/functions/FunctionActivation.java | 8 +- .../core/functions/LocalFunctionGroup.java | 208 ++++++++++++------ .../flink/core/functions/MailboxState.java | 2 +- .../flink/core/functions/ReusableContext.java | 18 +- .../flink/core/functions/RouteTracker.java | 76 +++++++ .../core/functions/StateAggregationInfo.java | 162 ++++++++++++++ .../flink/core/functions/SyncReplyState.java | 27 +++ .../procedures/StateAggregation.java | 167 +++----------- .../scheduler/QueueBasedLesseeSelector.java | 3 +- .../scheduler/RRIdSpanLesseeSelector.java | 4 +- .../functions/scheduler/RRLesseeSelector.java | 6 +- .../scheduler/RandomIdSpanLesseeSelector.java | 3 +- .../scheduler/RandomLesseeSelector.java | 15 +- .../statefun/flink/core/message/Message.java | 3 +- 14 files changed, 476 insertions(+), 226 deletions(-) create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StateAggregationInfo.java create mode 100644 statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/SyncReplyState.java diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionActivation.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionActivation.java index 9faf44f22..fb73a0079 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionActivation.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionActivation.java @@ -126,9 +126,13 @@ public void onSyncReceive(Message syncMessage, int numUpstreams){ blockedAddresses.add(addressMatch); } blockedMessages.addAll(pendingMessages); - System.out.println("onSyncReceive mailbox " + self + " blockedAddresses size " + blockedAddresses.size() + " status " + status + " tid: " + Thread.currentThread().getName()); + System.out.println("onSyncReceive mailbox " + self + + " blockedAddresses " + Arrays.toString(blockedAddresses.toArray()) + + " address match " + addressMatch + + " status " + status + " tid: " + Thread.currentThread().getName()); if(blockedAddresses.size() == numUpstreams && status == Status.RUNNABLE){ - System.out.println("onSyncReceive Mailbox " + self() + " ready to block on SYNC_ONE " + " blocked size " + blockedAddresses.size() + " status " + status); + System.out.println("onSyncReceive Mailbox " + self() + " ready to block on SYNC_ONE " + + " blocked size " + blockedAddresses.size() + " status " + status); this.readyToBlock = true; } } catch (Exception e) { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/LocalFunctionGroup.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/LocalFunctionGroup.java index affbd352e..9402934f5 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/LocalFunctionGroup.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/LocalFunctionGroup.java @@ -205,9 +205,6 @@ public void enqueue(Message message) { 0L, 0L, Message.MessageType.REPLY); message.setRequiresACK(false); context.send(envelope); - if(message.getMessageType() == Message.MessageType.FLUSH) { - return; - } } if(message.getMessageType() == Message.MessageType.REPLY){ String messageKey = (String) message.payload(getContext().getMessageFactory(), String.class.getClassLoader()); @@ -274,6 +271,33 @@ else if(message.getMessageType() == Message.MessageType.UNSYNC){ enqueue(unblockedMessage); } } + else if (message.getMessageType() == Message.MessageType.FLUSH){ + getRouteTracker().onFlushReceive(message.target(), message.source()); + } + else if (message.getMessageType() == Message.MessageType.FLUSH_DEPENDENCY){ + Object dependencyObject = message.payload(getContext().getMessageFactory(), Object.class.getClassLoader()); + if (dependencyObject instanceof HashMap){ + HashMap> dependencies = (HashMap>)message.payload(getContext().getMessageFactory(), HashMap.class.getClassLoader()); + System.out.println("Receiving FLUSH_DEPENDENCY from upstream " + dependencies.entrySet().stream().map(kv->kv.getKey() + " -> " + Arrays.toString(kv.getValue().toArray())).collect(Collectors.joining("|||")) + + " at " + message.target() + " tid: " + Thread.currentThread().getName()); + dependencies.entrySet().stream().forEach( + kv->{ + Message envelope = getContext().getMessageFactory().from(message.target(), kv.getKey().toAddress(), kv.getValue(), + 0L, 0L, Message.MessageType.FLUSH_DEPENDENCY); + System.out.println("Sending FLUSH_DEPENDENCY to sibling lessee " + kv.getKey() + " list of upstreams: " + Arrays.toString(kv.getValue().toArray()) + + " at " + message.source() + " tid: " + Thread.currentThread().getName()); + getContext().send(envelope); + } + ); + } + else{ + List
dependencies = (List
) message.payload(getContext().getMessageFactory(), List.class.getClassLoader()); + System.out.println("Receiving FLUSH_DEPENDENCY from sibling lessor " + message.source() + + " at " + message.target() + " list of upstreams: " + Arrays.toString(dependencies.toArray()) + " tid: " + Thread.currentThread().getName()); + getRouteTracker().onFlushDependencyReceive(message.target(), dependencies); + } + + } if(activation.isReadyToBlock()){ System.out.println("Ready to block from enqueue: queue size " + activation.runnableMessages.size() + " head message " + (activation.hasRunnableEnvelope()?activation.runnableMessages.get(0):"null") + " strategy queue size " + getStrategy(message.target()).getPendingQueue().size() + " tid: " + Thread.currentThread().getName()); @@ -285,7 +309,11 @@ else if(message.getMessageType() == Message.MessageType.UNSYNC){ tryHandleOnBlock(activation, message); } else{ - FunctionActivation activation = getActiveFunctions().get(new InternalAddress(message.target(), message.target().type().getInternalType())); + + FunctionActivation activation = getActiveFunctions().containsKey(message.target().toInternalAddress())? + getActiveFunctions().get(message.target().toInternalAddress()):newActivation(message.target()); //getActiveFunctions().get(new InternalAddress(message.target(), message.target().type().getInternalType())); + boolean needsRecycled = false; + message.setHostActivation(activation); // 2. Has no effect on mailbox, needs scheduler attention only if(message.isSchedulerCommand()){ getStrategy(message.target()).enqueue(message); @@ -294,52 +322,52 @@ else if(message.getMessageType() == Message.MessageType.UNSYNC){ tryFlushOutput(activation, message); tryHandleOnBlock(activation, message); } - - return; } - - if(message.isForwarded()){ - boolean hasPrevious = routeTracker.addLessor(message.target(), message.getLessor()); - if(!hasPrevious){ - Message envelope = getContext().getMessageFactory().from(message.target(), message.getLessor(), new ArrayList(), - 0L, 0L, Message.MessageType.LESSEE_REGISTRATION); - getContext().send(envelope); + else{ + if(message.isForwarded()){ + boolean hasPrevious = routeTracker.addLessor(message.target(), message.getLessor()); + if(!hasPrevious){ + Message envelope = getContext().getMessageFactory().from(message.target(), message.getLessor(), new ArrayList(), + 0L, 0L, Message.MessageType.LESSEE_REGISTRATION); + getContext().send(envelope); + } } - } - // 3. Inserting message to queue - boolean needsRecycled = false; - if (activation == null) { - activation = newActivation(message.target()); - message.setHostActivation(activation); - if(!message.isStateManagementMessage()){ - boolean success = activation.add(message); - // Add to strategy - if(success) getStrategy(message.target()).enqueue(message); - if(getPendingStrategies().size()>0) { - notEmpty.signal(); + // 3. Inserting message to queue +// if (activation == null) { +// activation = newActivation(message.target()); +// message.setHostActivation(activation); +// if(!message.isStateManagementMessage()){ +// boolean success = activation.add(message); +// // Add to strategy +// if(success) getStrategy(message.target()).enqueue(message); +// if(getPendingStrategies().size()>0) { +// notEmpty.signal(); +// } +// } +// else{ +// needsRecycled = true; +// } +// } +// else{ +// message.setHostActivation(activation); + if(!message.isStateManagementMessage()){ + boolean success = activation.add(message); + if (success) getStrategy(message.target()).enqueue(message); + if(getPendingStrategies().size()>0) notEmpty.signal(); } - } - else{ - needsRecycled = true; - } - } - else{ - message.setHostActivation(activation); - if(!message.isStateManagementMessage()){ - boolean success = activation.add(message); - if (success) getStrategy(message.target()).enqueue(message); - if(getPendingStrategies().size()>0) notEmpty.signal(); - } +// } + + // Step 1, 4-5 + procedure.handleNonControllerMessage(message); + tryFlushOutput(activation, message); + tryHandleOnBlock(activation, message); } - // Step 1, 4-5 - procedure.handleNonControllerMessage(message); - tryFlushOutput(activation, message); - tryHandleOnBlock(activation, message); + // 6. deregister any non necessary messages - if(needsRecycled - && activation.self()!=null + System.out.println("unRegisterActivation in enqueue based on message " + message + " activation" + activation + " tid: " + Thread.currentThread().getName()); + if(activation.self()!=null && message.getHostActivation().getStatus() != FunctionActivation.Status.EXECUTE_CRITICAL && !message.getHostActivation().hasPendingEnvelope() && !message.getHostActivation().hasRunningEnvelope() @@ -395,23 +423,26 @@ private void tryPerformUnsync(FunctionActivation activation){ } private void tryFlushOutput(FunctionActivation activation, Message message){ - if(!activation.hasRunnableEnvelope() && activation.isReadyToBlock() ){ + if(!activation.hasRunnableEnvelope() + && activation.isReadyToBlock() + && !getContext().hasPendingOutputMessage(activation.self()) + ){ // Flush all output channels - List
outputChannels = getRouteTracker().getAllActiveRoutes(message.target()); + List
outputChannels = getRouteTracker().getAllActiveDownstreamRoutes(message.target()); System.out.println("Get output channels at " + message.target() + " routes: " + Arrays.toString(outputChannels.toArray())); for(Address toFlush : outputChannels){ Message envelope = getContext().getMessageFactory().from(message.target(), toFlush, 0, 0L, 0L, Message.MessageType.FLUSH); - envelope.setRequiresACK(true); - if(pendingReplyBuffer.containsKey(RuntimeUtils.messageToID(envelope))){ - throw new FlinkRuntimeException("enqueue: Message key already exists: "+ RuntimeUtils.messageToID(envelope) + " message " + envelope); - } + // envelope.setRequiresACK(true); +// if(pendingReplyBuffer.containsKey(RuntimeUtils.messageToID(envelope))){ +// throw new FlinkRuntimeException("enqueue: Message key already exists: "+ RuntimeUtils.messageToID(envelope) + " message " + envelope); +// } System.out.println("Send FLUSH message that requires reply " + envelope + " key " + RuntimeUtils.messageToID(envelope) + " tid: " + Thread.currentThread().getName()); - pendingReplyBuffer.put(RuntimeUtils.messageToID(envelope), envelope); + //pendingReplyBuffer.put(RuntimeUtils.messageToID(envelope), envelope); getContext().send(envelope); - getRouteTracker().disableRoute(message.target(), toFlush); + // getRouteTracker().disableRoute(message.target(), toFlush); } } } @@ -419,13 +450,16 @@ private void tryFlushOutput(FunctionActivation activation, Message message){ private void tryHandleOnBlock(FunctionActivation activation, Message message){ if(activation.isReadyToBlock()){ Address self = activation.self(); - System.out.println("tryHandleOnBlock " + " activation " + activation.toDetailedString() + " empty running queue " + !activation.hasRunnableEnvelope() + " has pending output " + getContext().hasPendingOutputMessage(activation.self()) - + " pendings "+(!context.callbackPendings.containsKey(self.toInternalAddress())?"null": Arrays.toString(context.callbackPendings.get(self.toInternalAddress()).toArray()))); + System.out.println("tryHandleOnBlock " + " activation " + activation.toDetailedString() + " empty running queue " + !activation.hasRunnableEnvelope() + " has pending output " + getContext().hasPendingOutputMessage(self) + + " if upstream flushed " + routeTracker.ifUpstreamFlushed(self) + " flushed channels map: " + routeTracker.getTargetToFlushedChannels()); + //+ " pendings "+(!context.callbackPendings.containsKey(self.toInternalAddress())?"null": Arrays.toString(context.callbackPendings.get(self.toInternalAddress()).toArray()))); } if(activation.isReadyToBlock() && !activation.hasRunnableEnvelope() && - !getContext().hasPendingOutputMessage(activation.self()) + !getContext().hasPendingOutputMessage(activation.self()) && + routeTracker.ifUpstreamFlushed(activation.self()) ){ + routeTracker.clearFlushDependencyReceived(activation.self()); ArrayList pendings = getContext().callbackPendings.get(new InternalAddress(activation.self(), activation.self().type().getInternalType())); System.out.println("Pending user messages in scheduler command handling: " + " self " + activation.self() @@ -513,9 +547,52 @@ public Message prepareSend(Message message){ pendingReplyBuffer.put(RuntimeUtils.messageToID(message), message); } Message toSend = getStrategy(message.target()).prepareSend(message); + if(toSend!=null && toSend.getMessageType() == Message.MessageType.NON_FORWARDING){ + onSendingCrictical(toSend, getContext().getCurrentMessage().getHostActivation()); + } return toSend; } + public void onSendingCrictical(Message toSend, FunctionActivation activation){ +// if(controller.getRouteTracker().getTemporaryTargetToSourcesRoutes().isEmpty()){ + // merge routing table of my own + List
outputChannels = routeTracker.getAllActiveDownstreamRoutes(toSend.source()); + System.out.println("mergeTemporaryRoutingEntries prepareSend source: " + toSend.source() + + " " + Arrays.toString(outputChannels.toArray()) + + " message: " + toSend + " tid: " + Thread.currentThread().getName()); + routeTracker.mergeTemporaryRoutingEntries(toSend.source(), outputChannels); + outputChannels.forEach(x->routeTracker.disableRoute(toSend.source(), x)); +// } + + //if(context.getCurrentMessage().getMessageType() != Message.MessageType.NON_FORWARDING){ + if(!(activation!=null && activation.getStatus().equals(FunctionActivation.Status.EXECUTE_CRITICAL))){ + for(Address toFlush : outputChannels){ + Message envelope = getContext().getMessageFactory().from(toSend.source(), toFlush, 0, + 0L, 0L, Message.MessageType.FLUSH); + // envelope.setRequiresACK(true); +// if(pendingReplyBuffer.containsKey(RuntimeUtils.messageToID(envelope))){ +// throw new FlinkRuntimeException("enqueue: Message key already exists: "+ RuntimeUtils.messageToID(envelope) + " message " + envelope); +// } + System.out.println("Send FLUSH message from " + envelope + + " key " + RuntimeUtils.messageToID(envelope) + + " tid: " + Thread.currentThread().getName()); +// pendingReplyBuffer.put(RuntimeUtils.messageToID(envelope), envelope); + getContext().send(envelope); + // getRouteTracker().disableRoute(message.target(), toFlush);x + } + } + + // Send dependencies first only when mailbox is in the blocked state + HashMap> dependencyRoutingTable = new HashMap<>(); + dependencyRoutingTable.putAll(getRouteTracker().getTemporaryTargetToSourcesRoutes()); + System.out.println("Sending FLUSH_DEPENDENCY to downstream lessor " + dependencyRoutingTable.entrySet().stream().map(kv->kv.getKey() + " -> " + Arrays.toString(kv.getValue().toArray())).collect(Collectors.joining("|||")) + + " at " + toSend.source() + " tid: " + Thread.currentThread().getName()); + Message envelope = getContext().getMessageFactory().from(toSend.source(), toSend.target(), dependencyRoutingTable, + 0L, 0L, Message.MessageType.FLUSH_DEPENDENCY); + getContext().send(envelope); + getRouteTracker().clearTemporaryRoutingEntries(); + + } public boolean replacePendingMessage(Message oldMessage, Message newMessage){ boolean ret = false; if(pendingReplyBuffer.containsKey(RuntimeUtils.messageToID(oldMessage))){ @@ -584,7 +661,7 @@ public FunctionActivation newActivation(Address self) { System.out.println("Set readyToBlock to true while block size is " + activation.getBlocked().size()); } } - activeFunctions.put(new InternalAddress(self, self.type().getInternalType()), activation); + activeFunctions.put(self.toInternalAddress(), activation); strategyToFunctions.putIfAbsent(getStrategy(self), new HashSet<>()); strategyToFunctions.get(getStrategy(self)).add(activation); System.out.println("Create activation for address " + self + " activation "+ activation + " tid: " + Thread.currentThread().getName()); @@ -600,7 +677,10 @@ public void unRegisterActivation(FunctionActivation activation){ if(strategyToFunctions.get(getStrategy(activation.self())).isEmpty()){ strategyToFunctions.remove(getStrategy(activation.self())); } - repository.updateStatus(activation.self(), new MailboxState(activation.getStatus(), activation.isReadyToBlock(), activation.getPendingStateRequest())); + repository.updateStatus(activation.self(), + new MailboxState(activation.getStatus(), + activation.isReadyToBlock(), + activation.getPendingStateRequest())); activation.reset(); pool.release(activation); } @@ -642,6 +722,8 @@ public StateAggregation getProcedure(){ public HashMap getActiveFunctions() { return activeFunctions; } + public FunctionActivation getActivation(Address address) { return activeFunctions.get(address.toInternalAddress()); } + public LiveFunction getFunction(Address address) { System.out.println("LocalFunctionGroup getFunction address " + address + " repository " + (repository==null?"null":repository) @@ -661,13 +743,13 @@ public StateManager getStateManager(){ public void cancel(Message message){ System.out.println("Cancel message " + message); message.getHostActivation().removeEnvelope(message); - if( !message.getHostActivation().hasPendingEnvelope() - && !message.getHostActivation().hasRunningEnvelope() - && message.getHostActivation().self()!=null - && message.getHostActivation().getStatus() != FunctionActivation.Status.EXECUTE_CRITICAL - ){ - unRegisterActivation(message.getHostActivation()); - } +// if( !message.getHostActivation().hasPendingEnvelope() +// && !message.getHostActivation().hasRunningEnvelope() +// && message.getHostActivation().self()!=null +// && message.getHostActivation().getStatus() != FunctionActivation.Status.EXECUTE_CRITICAL +// ){ +// unRegisterActivation(message.getHostActivation()); +// } } public void close() { } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/MailboxState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/MailboxState.java index 14547cb81..9deac98c9 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/MailboxState.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/MailboxState.java @@ -10,7 +10,7 @@ public class MailboxState { public Address pendingStateRequest; - public MailboxState(FunctionActivation.Status status, boolean readyToBlock, Address pendingStateRequest) { + public MailboxState(FunctionActivation.Status status, boolean readyToBlock, Address pendingStateRequest ) { this.status = status; this.readyToBlock = readyToBlock; this.pendingStateRequest = pendingStateRequest; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java index 6a2e087c1..232f6f68a 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java @@ -189,11 +189,15 @@ public Message forward(Address to, Message message, ClassLoader loader, boolean Object what = message.payload(messageFactory, loader); Objects.requireNonNull(what); Address lessor = message.target(); - Address currentAddress = (in == null?message.target() : in.target());// calling from enqueue or not + Address currentAddress = ((in == null || (message.getHostActivation().self().toInternalAddress() != in.target().toInternalAddress()))? message.getHostActivation().self() : in.target());// calling from enqueue or not + if(currentAddress == null){ + throw new FlinkRuntimeException("Forward a message with no host activation on message " + message + " to " + to + " in message: " + (in==null?"null":in) + " tid: " + Thread.currentThread().getName()); + } if(force){ envelope = messageFactory.from(message.source(), to, what, message.getPriority().priority, message.getPriority().laxity, Message.MessageType.FORWARDED, message.getMessageId()); System.out.println("Register route through forward: " + currentAddress + " -> " + to + " tid: " + Thread.currentThread().getName()); + System.out.println("RouteTracker forward activateRoute from " + currentAddress + " to " + to + " on message " + message + " in " + (in==null?"null":in) + " tid: " + Thread.currentThread().getName()); ownerFunctionGroup.get().getRouteTracker().activateRoute(currentAddress, to); } else{ @@ -283,8 +287,11 @@ public void send(Address to, Object what) { callbackPendings.get(in.target().toInternalAddress()).add(pendingEnvelope); } else{ - System.out.println("Register route through send: " + in.target() + " -> " + envelope.target() + " tid: " + Thread.currentThread().getName()); + System.out.println("RouteTracker send activateRoute from " + in.target() + " to " + envelope.target() + " on message " + envelope + " tid: " + Thread.currentThread().getName()); ownerFunctionGroup.get().getRouteTracker().activateRoute(in.target(), envelope.target()); + if(envelope.getMessageType() == Message.MessageType.NON_FORWARDING){ + ownerFunctionGroup.get().onSendingCrictical(envelope, in.getHostActivation()); + } if (thisPartition.contains(envelope.target())) { localSinkPendingQueue.add(envelope); // drainLocalSinkOutput(); @@ -314,12 +321,15 @@ public void sendComplete(Address initiator, Message previous, Message dispatch){ } if(dispatch != null){ if(dispatch.isForwarded()){ - System.out.println("Register route through sendComplete (forward): " + " ia " + ia.toAddress() + " from " + initiator + " -> " + dispatch.target() + " tid: " + Thread.currentThread().getName()); + System.out.println("RouteTracker sendComplete (forward) activateRoute from " + initiator + " to " + dispatch.target() + " ia " + ia.toAddress() + " on message " + dispatch + " tid: " + Thread.currentThread().getName()); ownerFunctionGroup.get().getRouteTracker().activateRoute(initiator, dispatch.target()); } else{ + System.out.println("RouteTracker sendComplete activateRoute from " + initiator + " to " + dispatch.target() + " ia " + ia.toAddress() + " on message " + dispatch + " tid: " + Thread.currentThread().getName()); ownerFunctionGroup.get().getRouteTracker().activateRoute(initiator, dispatch.target()); - System.out.println("Register route through sendComplete: " + initiator + " -> " + dispatch.target() + " tid: " + Thread.currentThread().getName()); + } + if(dispatch.getMessageType() == Message.MessageType.NON_FORWARDING){ + ownerFunctionGroup.get().onSendingCrictical(dispatch, ownerFunctionGroup.get().getActivation(initiator)); } if (thisPartition.contains(dispatch.target())) { localSinkPendingQueue.add(dispatch); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RouteTracker.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RouteTracker.java index 7b727101b..684138c2b 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RouteTracker.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/RouteTracker.java @@ -1,8 +1,11 @@ package org.apache.flink.statefun.flink.core.functions; +import javafx.util.Pair; import org.apache.flink.statefun.sdk.Address; import org.apache.flink.statefun.sdk.InternalAddress; +import org.apache.flink.statefun.sdk.utils.DataflowUtils; import org.apache.flink.util.FlinkRuntimeException; +import scala.Int; import java.util.*; import java.util.stream.Collectors; @@ -12,6 +15,9 @@ public class RouteTracker { private final HashMap> lessorToLessees = new HashMap<>(); private final HashMap lesseeToLessor = new HashMap<>(); + private final HashMap> targetToSourceRoutes = new HashMap<>(); + private final HashMap, HashSet>> targetToFlushedChannels = new HashMap<>(); + // Add address mapping when scheduler forward a message // A message sent from initiator to a PA on behalf of VA public void activateRoute(Address initiator, Address pa){ @@ -48,6 +54,15 @@ public List
getAllActiveRoutes (Address initiator){ return new ArrayList<>(); } + public List
getAllActiveDownstreamRoutes (Address initiator){ + if(routes.containsKey(initiator.toInternalAddress())){ + return routes.get(initiator.toInternalAddress()).entrySet().stream() + .filter(pair-> pair.getValue() && (DataflowUtils.getFunctionId(pair.getKey().toAddress().type().getInternalType()) > DataflowUtils.getFunctionId(initiator.type().getInternalType()))) + .map(kv->kv.getKey().toAddress()).collect(Collectors.toList()); + } + return new ArrayList<>(); + } + // get all PAs associated with a VA created by initiator public Address[] getAllRoutes(Address initiator){ @@ -91,6 +106,16 @@ public Address getLessor(Address lessee){ return null; } + public boolean ifLessorOf(Address lessor, Address lessee){ + if(!lesseeToLessor.containsKey(lessee.toInternalAddress())) return false; + return lesseeToLessor.get(lessee.toInternalAddress()).equals(lessor.toInternalAddress()); + } + + public boolean ifLesseeOf(Address lessee, Address lessor){ + if(!lessorToLessees.containsKey(lessor.toInternalAddress())) return false; + return lessorToLessees.get(lessor.toInternalAddress()).contains(lessee.toInternalAddress()); + } + public boolean removeLessor(Address lessee){ return lesseeToLessor.remove(lessee.toInternalAddress()) != null; } @@ -104,4 +129,55 @@ public String getLesseeToLessorMap(){ return String.format("< %s >", lesseeToLessor.entrySet().stream() .map(kv->kv.getKey().toAddress() + " -> " + kv.getValue().toAddress()).collect(Collectors.joining("|||"))); } + + public void mergeTemporaryRoutingEntries(Address source, List
targets){ + for(Address target : targets){ + targetToSourceRoutes.putIfAbsent(target.toInternalAddress(), new HashSet<>()); + targetToSourceRoutes.get(target.toInternalAddress()).add(source.toInternalAddress()); + + } + } + + public Map> getTemporaryTargetToSourcesRoutes(){ + return targetToSourceRoutes.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, kv->kv.getValue().stream().map(InternalAddress::toAddress).collect(Collectors.toList())));//.map(kv->new Map.Entry<>(kv.getKey().toAddress(), kv.getValue().stream().map(InternalAddress::toAddress).collect(Collectors.toList()))).coll; + } + + public void clearTemporaryRoutingEntries(){ + targetToSourceRoutes.clear(); + } + + public void onFlushDependencyReceive(Address target, List
dependencies){ + targetToFlushedChannels.putIfAbsent(target.toInternalAddress(), new Pair<>(new HashSet<>(), new HashSet<>())); + targetToFlushedChannels.get(target.toInternalAddress()).getKey().addAll(dependencies.stream().map(Address::toInternalAddress).collect(Collectors.toList())); + } + + public void onFlushReceive(Address target, Address flushReceived){ + targetToFlushedChannels.putIfAbsent(target.toInternalAddress(), new Pair<>(new HashSet<>(), new HashSet<>())); + targetToFlushedChannels.get(target.toInternalAddress()).getValue().add(flushReceived.toInternalAddress()); + } + + public boolean ifUpstreamFlushed(Address target){ + Pair, HashSet> flushes = targetToFlushedChannels.get(target.toInternalAddress()); + if(flushes== null) { + System.out.println("Target address: " + target + " has no upstream flushes. tid: " + Thread.currentThread().getName()); + return true; + } + return flushes.getKey().equals(flushes.getValue()); + } + + public void clearFlushDependencyReceived(Address target){ + targetToFlushedChannels.remove(target.toInternalAddress()); + } + + public String getTargetToSourceRoutes() { + return targetToSourceRoutes.entrySet().stream().map(kv-> kv.getKey() + " -> " + Arrays.toString(kv.getValue().toArray())).collect(Collectors.joining("|||")); + } + + public String getTargetToFlushedChannels() { + return targetToFlushedChannels.entrySet().stream() + .map(kv-> kv.getKey() + " -> (" + Arrays.toString(kv.getValue().getKey().toArray()) + " == " + Arrays.toString(kv.getValue().getValue().toArray()) + ")") + .collect(Collectors.joining(" ||| ")); + } } + + diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StateAggregationInfo.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StateAggregationInfo.java new file mode 100644 index 000000000..27a2c74f7 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/StateAggregationInfo.java @@ -0,0 +1,162 @@ +package org.apache.flink.statefun.flink.core.functions; + +import org.apache.flink.statefun.flink.core.functions.scheduler.LesseeSelector; +import org.apache.flink.statefun.flink.core.functions.scheduler.RandomLesseeSelector; +import org.apache.flink.statefun.sdk.Address; +import org.apache.flink.statefun.sdk.InternalAddress; + +import java.util.*; +import java.util.stream.Collectors; + +// Class to hold all the information related to state aggregation +public class StateAggregationInfo { + // private int numUpstreams; + //private int numPartialStatesReceived; + private Address syncSource; + private Set lessees; + private ArrayList
partitionedAddresses; + private LesseeSelector lesseeSelector; + private HashSet distinctPartialStateSources; + private Set expectedPartialStateSources; + private HashSet distinctCriticalMessages; + // TODO: Assigned from sync recv + public Set expectedCriticalMessageSources; + private Boolean autoblocking; + private Boolean pendingRequestServed; + + + public StateAggregationInfo(ReusableContext context) { +// this.numUpstreams = numUpstreams; + this.syncSource = null; + this.lessees = new HashSet<>(); + //this.numPartialStatesReceived = 0; + this.partitionedAddresses = null; + this.lesseeSelector = new RandomLesseeSelector(context.getPartition()); + this.distinctPartialStateSources = new HashSet<>(); + this.expectedPartialStateSources = new HashSet<>(); + this.distinctCriticalMessages = new HashSet<>(); + this.expectedCriticalMessageSources = new HashSet<>(); + this.autoblocking = null; + this.pendingRequestServed = true; + } + + public void resetInfo() { + this.distinctPartialStateSources.clear(); + this.distinctCriticalMessages.clear(); + this.expectedCriticalMessageSources.clear(); + this.expectedPartialStateSources.clear(); + this.lessees.clear(); + this.autoblocking = null; + this.syncSource = null; + } + + public Address getSyncSource() { + return this.syncSource; + } + + public void setSyncSource(Address syncSource) { + this.syncSource = syncSource; + } +// public int getNumUpstreams() { +// return this.numUpstreams; +// } + + public void incrementNumPartialStatesReceived(InternalAddress address) { + this.distinctPartialStateSources.add(address); + //this.numPartialStatesReceived += 1; + } + + public void incrementNumCriticalMessagesReceived(InternalAddress address) { + this.distinctCriticalMessages.add(address); + } + + public boolean areAllPartialStatesReceived() { + //return (this.distinctPartialStateSources.size() == lesseeSelector.getBroadcastAddresses(lessor).size()); + return (this.distinctPartialStateSources.size() == expectedPartialStateSources.size()); + } + + public boolean areAllCriticalMessagesReceived() { + return (this.distinctCriticalMessages.size() == expectedCriticalMessageSources.size()); + } + + public void setExpectedPartialStateSources(Set sources) { + expectedPartialStateSources = sources; + } + + public Set
getExpectedPartialStateSources() { + return expectedPartialStateSources.stream().map(x -> x.address).collect(Collectors.toSet()); + } + + public Set
getExpectedCriticalMessage() { + return expectedCriticalMessageSources.stream().map(x -> x.address).collect(Collectors.toSet()); + } + + public void setExpectedCriticalMessageSources(Set sources) { + expectedCriticalMessageSources = sources; + } + + public void addLessee(Address lessee) { + lessees.add(new InternalAddress(lessee, lessee.type().getInternalType())); + } + + public List
getLessees() { + return lessees.stream().map(ia -> ia.address).collect(Collectors.toList()); + } + + public boolean hasLessee(Address lessee) { + return lessees.contains(new InternalAddress(lessee, lessee.type().getInternalType())); + } + + public void setAutoblocking(Boolean blocking) { + autoblocking = blocking; + } + + public Boolean ifAutoblocking() { + return autoblocking; + } + + public Boolean getPendingRequestServed() { + return pendingRequestServed; + } + + public void setPendingRequestServed(Boolean requestServed) { + pendingRequestServed = requestServed; + } + + // TODO: Use this function at all context forwards. Need to capture the context.forward() call + public void addPartition(Address partition) { + this.partitionedAddresses.add(partition); + } + + public ArrayList
getPartitionedAddresses() { + //return this.partitionedAddresses; + return lesseeSelector.getBroadcastAddresses(syncSource); + } + public HashSet getDistinctPartialStateSources() { + return distinctPartialStateSources; + } + + public void setDistinctPartialStateSources(HashSet distinctPartialStateSources) { + this.distinctPartialStateSources = distinctPartialStateSources; + } + + public HashSet getDistinctCriticalMessages() { + return distinctCriticalMessages; + } + + public void setDistinctCriticalMessages(HashSet distinctCriticalMessages) { + this.distinctCriticalMessages = distinctCriticalMessages; + } + + public Set getExpectedCriticalMessageSources() { + return expectedCriticalMessageSources; + } + + @Override + public String toString() { + return String.format("StateAggregationInfo numPartialStatesReceived %d lessor %s partitionedAddresses %s hash %d", distinctPartialStateSources.size(), + (syncSource == null ? "null" : syncSource.toString()), (partitionedAddresses == null ? "null" : Arrays.toString(partitionedAddresses.toArray())), this.hashCode()); + } + + +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/SyncReplyState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/SyncReplyState.java new file mode 100644 index 000000000..912af78bc --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/SyncReplyState.java @@ -0,0 +1,27 @@ +package org.apache.flink.statefun.flink.core.functions; + +import javafx.util.Pair; +import org.apache.flink.statefun.sdk.Address; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class SyncReplyState implements Serializable { + private HashMap, byte[]> stateMap; + private List
targetList; + + public SyncReplyState(HashMap, byte[]> map, List
list){ + stateMap = map; + targetList = list; + } + + public HashMap, byte[]> getStateMap(){ + return stateMap; + } + + public List
getTargetList(){ + return targetList; + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/procedures/StateAggregation.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/procedures/StateAggregation.java index 9149bcade..206125da2 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/procedures/StateAggregation.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/procedures/StateAggregation.java @@ -2,8 +2,6 @@ import javafx.util.Pair; import org.apache.flink.statefun.flink.core.functions.*; -import org.apache.flink.statefun.flink.core.functions.scheduler.LesseeSelector; -import org.apache.flink.statefun.flink.core.functions.scheduler.RandomLesseeSelector; import org.apache.flink.statefun.flink.core.functions.scheduler.SchedulingStrategy; import org.apache.flink.statefun.flink.core.functions.utils.RuntimeUtils; import org.apache.flink.statefun.flink.core.message.Message; @@ -140,6 +138,7 @@ public void handleOnBlock(FunctionActivation activation, Message message) { System.out.println(String.format("Could not find following instances to send sync request: %s, tid: %s", activation.self(), Thread.currentThread().getName())); } info.setExpectedPartialStateSources(stateOwners.stream().map(x->new InternalAddress(x, x.type().getInternalType())).collect(Collectors.toSet())); + sendStateRequests(stateOwners, message, info.ifAutoblocking()); } else{ @@ -241,6 +240,7 @@ public void handleNonControllerMessage(Message message) { //activation.setReadyToBlock(true); activation.onSyncAllReceive(message); } + activation.setPendingStateRequest(message.source()); System.out.println("handleNonControllerMessage set pending state request activation (autoblocking) " + activation + " as " + message.source()); // If autoblocking then does not check whether mailbox is blocked @@ -296,7 +296,12 @@ public void handleNonControllerMessage(Message message) { info = this.aggregationInfo.get(message.target().toInternalAddress()); info.incrementNumPartialStatesReceived(new InternalAddress(message.source(), message.source().type())); // Received state from a partition - merge the state (from the payload) - HashMap, byte[]> request = (HashMap, byte[]>) message.payload((controller.getContext()).getMessageFactory(), PartialState.class.getClassLoader()); + // HashMap, byte[]> request = (HashMap, byte[]>) message.payload((controller.getContext()).getMessageFactory(), HashMap.class.getClassLoader()); + SyncReplyState replyState = (SyncReplyState) message.payload((controller.getContext()).getMessageFactory(), SyncReplyState.class.getClassLoader()); + HashMap, byte[]> request = replyState.getStateMap(); + List
channels = replyState.getTargetList(); + controller.getRouteTracker().mergeTemporaryRoutingEntries(message.source(), channels); + System.out.println("mergeTemporaryRoutingEntries on receiving SYNC_REPLY source: " + message.source() + " channels " + Arrays.toString(channels.toArray()) + " tid: " + Thread.currentThread().getName()); System.out.println("Receive STATE_AGGREGATE request " + message + " state map " + Arrays.toString(request.entrySet().stream().map(kv->kv.getKey()+"->" +(kv.getValue() == null?"null":kv.getValue().length)).toArray()) + " tid: " + Thread.currentThread().getName()); @@ -365,16 +370,16 @@ public void handleNonControllerMessage(Message message) { controller.getStateManager().removeStateRegistrations(message.target(), message.source()); } } else if (message.getMessageType() == Message.MessageType.NON_FORWARDING) { - System.out.println("Inserting source address on non_forwarding message " + message.source() + " critical message received: " + info.distinctCriticalMessages.size()); + System.out.println("Inserting source address on non_forwarding message " + message.source() + " critical message received: " + info.getDistinctCriticalMessages().size()); info.incrementNumCriticalMessagesReceived(new InternalAddress(message.source(), message.source().type())); } if(activation.getStatus() == FunctionActivation.Status.BLOCKED){ System.out.println("handleNonControllerMessage: activation " + activation + "expectedCriticalMessageSources " + Arrays.toString(info.expectedCriticalMessageSources.toArray()) - + " distinctCriticalMessages: " + Arrays.toString(info.distinctCriticalMessages.toArray()) - + " expectedPartialStateSources " + Arrays.toString(info.expectedPartialStateSources.toArray()) - + " distinctPartialStateSources " + Arrays.toString(info.distinctPartialStateSources.toArray()) + + " distinctCriticalMessages: " + Arrays.toString(info.getDistinctCriticalMessages().toArray()) + + " expectedPartialStateSources " + Arrays.toString(info.getExpectedPartialStateSources().toArray()) + + " distinctPartialStateSources " + Arrays.toString(info.getDistinctPartialStateSources().toArray()) + " areAllCriticalMessagesReceived " + info.areAllCriticalMessagesReceived() + " areAllPartialStatesReceived " + info.areAllPartialStatesReceived() + " string to match " + RuntimeUtils.sourceToPrefix(activation.self()) @@ -384,22 +389,22 @@ public void handleNonControllerMessage(Message message) { if (info.areAllPartialStatesReceived() && info.areAllCriticalMessagesReceived() - && info.pendingRequestServed + && info.getPendingRequestServed() && activation.getStatus() == FunctionActivation.Status.BLOCKED) { if (activation.getStatus() != FunctionActivation.Status.BLOCKED) { System.out.println("Function activation not blocked when executing critical messages " + activation.getStatus() + " tid: " + Thread.currentThread().getName()); } + System.out.println("ExecuteCriticalMessages: activation " + activation + "expectedCriticalMessageSources " + Arrays.toString(info.expectedCriticalMessageSources.toArray()) - + " distinctCriticalMessages: " + Arrays.toString(info.distinctCriticalMessages.toArray()) - + " expectedPartialStateSources " + Arrays.toString(info.expectedPartialStateSources.toArray()) - + " distinctPartialStateSources " + Arrays.toString(info.distinctPartialStateSources.toArray()) + + " distinctCriticalMessages: " + Arrays.toString(info.getDistinctCriticalMessages().toArray()) + + " expectedPartialStateSources " + Arrays.toString(info.getExpectedPartialStateSources().toArray()) + + " distinctPartialStateSources " + Arrays.toString(info.getDistinctPartialStateSources().toArray()) ); // Execute all critical messages, by appending them in the runnable queue ArrayList criticalMessages = message.getHostActivation().executeCriticalMessages(info.getExpectedCriticalMessage()); for (Message cm : criticalMessages) { - controller.getStrategy(cm.target()).enqueue(cm); } System.out.println("Insert critical message " + Arrays.toString(criticalMessages.toArray()) + " tid: " + Thread.currentThread().getName()); @@ -500,7 +505,6 @@ private void sendPartialState(Address self, Address target) { // StateAggregationInfo info = this.aggregationInfo.get(self.toInternalAddress()); //info.setPendingRequestServed(true); - if(((StatefulFunction) controller.getFunction(self)).statefulSubFunction(self) && !self.equals(target)){ for (ManagedState state : states) { if (state instanceof PartitionedMergeableState) { @@ -555,7 +559,14 @@ private void sendPartialState(Address self, Address target) { payload.addStateMap(stateMap); } - Message envelope = controller.getContext().getMessageFactory().from(self, target, stateMap, + + List
outputChannels = controller.getRouteTracker().getAllActiveDownstreamRoutes(self); + System.out.println("sendPartialState from " + self + " to " + target + + " report output channels " + Arrays.toString(outputChannels.toArray()) + + " tid: " + Thread.currentThread().getName()); + SyncReplyState syncReplyState = new SyncReplyState(stateMap, outputChannels); + outputChannels.forEach(x->controller.getRouteTracker().disableRoute(self, x)); + Message envelope = controller.getContext().getMessageFactory().from(self, target, syncReplyState, 0L, 0L, Message.MessageType.SYNC_REPLY); try { controller.getContext().send(envelope); @@ -585,132 +596,4 @@ public String toString() { } } - // Class to hold all the information related to state aggregation - public static class StateAggregationInfo { -// private int numUpstreams; - //private int numPartialStatesReceived; - private Address syncSource; - private Set lessees; - private ArrayList
partitionedAddresses; - private LesseeSelector lesseeSelector; - private HashSet distinctPartialStateSources; - private Set expectedPartialStateSources; - private HashSet distinctCriticalMessages; - // TODO: Assigned from sync recv - public Set expectedCriticalMessageSources; - private Boolean autoblocking; - private Boolean pendingRequestServed; - - StateAggregationInfo(ReusableContext context) { -// this.numUpstreams = numUpstreams; - this.syncSource = null; - this.lessees = new HashSet<>(); - //this.numPartialStatesReceived = 0; - this.partitionedAddresses = null; - this.lesseeSelector = new RandomLesseeSelector(context.getPartition()); - this.distinctPartialStateSources = new HashSet<>(); - this.expectedPartialStateSources = new HashSet<>(); - this.distinctCriticalMessages = new HashSet<>(); - this.expectedCriticalMessageSources = new HashSet<>(); - this.autoblocking = null; - this.pendingRequestServed = true; - } - - public void resetInfo() { - this.distinctPartialStateSources.clear(); - this.distinctCriticalMessages.clear(); - this.expectedCriticalMessageSources.clear(); - this.expectedPartialStateSources.clear(); - this.lessees.clear(); - this.autoblocking = null; - this.syncSource = null; - } - - public Address getSyncSource() { - return this.syncSource; - } - - public void setSyncSource(Address syncSource){ - this.syncSource = syncSource; - } -// public int getNumUpstreams() { -// return this.numUpstreams; -// } - - public void incrementNumPartialStatesReceived(InternalAddress address) { - this.distinctPartialStateSources.add(address); - //this.numPartialStatesReceived += 1; - } - - public void incrementNumCriticalMessagesReceived(InternalAddress address) { - this.distinctCriticalMessages.add(address); - } - - public boolean areAllPartialStatesReceived() { - //return (this.distinctPartialStateSources.size() == lesseeSelector.getBroadcastAddresses(lessor).size()); - return (this.distinctPartialStateSources.size() == expectedPartialStateSources.size()); - } - - public boolean areAllCriticalMessagesReceived() { - return (this.distinctCriticalMessages.size() == expectedCriticalMessageSources.size()); - } - - public void setExpectedPartialStateSources(Set sources){ - expectedPartialStateSources = sources; - } - - public Set
getExpectedPartialStateSources(){ - return expectedPartialStateSources.stream().map(x->x.address).collect(Collectors.toSet()); - } - - public Set
getExpectedCriticalMessage(){ - return expectedCriticalMessageSources.stream().map(x->x.address).collect(Collectors.toSet()); - } - - public void setExpectedCriticalMessageSources(Set sources){ - expectedCriticalMessageSources = sources; - } - - public void addLessee(Address lessee){ - lessees.add(new InternalAddress(lessee, lessee.type().getInternalType())); - } - - public List
getLessees(){ - return lessees.stream().map(ia->ia.address).collect(Collectors.toList()); - } - - public boolean hasLessee(Address lessee){ - return lessees.contains(new InternalAddress(lessee, lessee.type().getInternalType())); - } - - public void setAutoblocking(Boolean blocking){ - autoblocking = blocking; - } - - public Boolean ifAutoblocking(){ - return autoblocking; - } - - public Boolean getPendingRequestServed() {return pendingRequestServed;} - - public void setPendingRequestServed(Boolean requestServed) { - pendingRequestServed = requestServed; - } - - // TODO: Use this function at all context forwards. Need to capture the context.forward() call - public void addPartition(Address partition) { - this.partitionedAddresses.add(partition); - } - - public ArrayList
getPartitionedAddresses() { - //return this.partitionedAddresses; - return lesseeSelector.getBroadcastAddresses(syncSource); - } - - @Override - public String toString() { - return String.format("StateAggregationInfo numPartialStatesReceived %d lessor %s partitionedAddresses %s hash %d", distinctPartialStateSources.size(), - (syncSource == null ? "null" : syncSource.toString()), (partitionedAddresses == null ? "null" : Arrays.toString(partitionedAddresses.toArray())), this.hashCode()); - } - } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/QueueBasedLesseeSelector.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/QueueBasedLesseeSelector.java index 81ec2c898..e2e704e25 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/QueueBasedLesseeSelector.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/QueueBasedLesseeSelector.java @@ -74,7 +74,8 @@ public void collect(Address address, Integer queueSize) { public ArrayList
exploreLessee(Address address) { ArrayList
ret = new ArrayList<>(); for(int i = 0; i < EXPLORE_RANGE; i++){ - int targetOperatorId = ((ThreadLocalRandom.current().nextInt() % (partition.getParallelism() - 1) + (partition.getParallelism() - 1)) % (partition.getParallelism() - 1) + partition.getThisOperatorIndex() + 1) % (partition.getParallelism()); + //int targetOperatorId = ((ThreadLocalRandom.current().nextInt() % (partition.getParallelism() - 1) + (partition.getParallelism() - 1)) % (partition.getParallelism() - 1) + partition.getThisOperatorIndex() + 1) % (partition.getParallelism()); + int targetOperatorId = ThreadLocalRandom.current().nextInt(0, partition.getParallelism()); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); ret.add(new Address(address.type(), String.valueOf(keyGroupId))); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRIdSpanLesseeSelector.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRIdSpanLesseeSelector.java index 9258134df..df78a0ef0 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRIdSpanLesseeSelector.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRIdSpanLesseeSelector.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator; @@ -76,7 +77,8 @@ public ArrayList
exploreTargetBasedLessee(Address target, Address sourc String key = target + " " + source.toString(); for(int i = 0; i < this.EXPLORE_RANGE; i++){ int rangeIndex = (lessorToLastIndex.getOrDefault(key, 0) + i + targetIdList.size()) % targetIdList.size(); - int targetOperatorId = (targetIdList.get(rangeIndex) + destinationOperatorIndex + partition.getParallelism()) % partition.getParallelism(); + //int targetOperatorId = (targetIdList.get(rangeIndex) + destinationOperatorIndex + partition.getParallelism()) % partition.getParallelism(); + int targetOperatorId = (ThreadLocalRandom.current().nextInt(0, partition.getParallelism()) + rangeIndex) % partition.getParallelism(); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); ret.add(new Address(target.type(), String.valueOf(keyGroupId))); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRLesseeSelector.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRLesseeSelector.java index 197ce7071..38b9e717a 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRLesseeSelector.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RRLesseeSelector.java @@ -30,9 +30,9 @@ public RRLesseeSelector(Partition partition) { @Override public Address selectLessee(Address lessorAddress) { - if (lastIndex == this.partition.getThisOperatorIndex()){ - lastIndex = (lastIndex + 1) % this.partition.getParallelism(); - } +// if (lastIndex == this.partition.getThisOperatorIndex()){ +// lastIndex = (lastIndex + 1) % this.partition.getParallelism(); +// } int targetOperatorId = lastIndex; lastIndex = (lastIndex + 1) % this.partition.getParallelism(); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomIdSpanLesseeSelector.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomIdSpanLesseeSelector.java index b74654114..ddadc5a52 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomIdSpanLesseeSelector.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomIdSpanLesseeSelector.java @@ -47,7 +47,8 @@ public RandomIdSpanLesseeSelector(Partition partition, int range, int searchSpan @Override public Address selectLessee(Address lessorAddress) { int step = (int)(ThreadLocalRandom.current().nextDouble() * (MAX_SPAN-1)) + 1; - int targetOperatorId = (step + partition.getThisOperatorIndex() + partition.getParallelism()) % partition.getParallelism(); + //int targetOperatorId = (step + partition.getThisOperatorIndex() + partition.getParallelism()) % partition.getParallelism(); + int targetOperatorId = (ThreadLocalRandom.current().nextInt(0, partition.getParallelism()) + step + partition.getThisOperatorIndex()) % partition.getParallelism(); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); return new Address(lessorAddress.type(), String.valueOf(keyGroupId)); } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomLesseeSelector.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomLesseeSelector.java index a3863653b..aa7faaa73 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomLesseeSelector.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/scheduler/RandomLesseeSelector.java @@ -23,9 +23,9 @@ public RandomLesseeSelector(Partition partition) { this.partition = partition; this.targetIdList = new ArrayList<>(); for (int i = 0; i < partition.getParallelism(); i++){ - if(i != partition.getThisOperatorIndex()){ +// if(i != partition.getThisOperatorIndex()){ this.targetIdList.add(i); - } +// } } LOG.debug("Initialize QueueBasedLesseeSelector operator index {} parallelism {} max parallelism {} keygroup {}", partition.getThisOperatorIndex(), partition.getParallelism(), partition.getMaxParallelism(), @@ -37,9 +37,9 @@ public RandomLesseeSelector(Partition partition, int range) { this.partition = partition; this.targetIdList = new ArrayList<>(); for (int i = 0; i < partition.getParallelism(); i++){ - if(i != partition.getThisOperatorIndex()){ +// if(i != partition.getThisOperatorIndex()){ this.targetIdList.add(i); - } +// } } LOG.debug("Initialize RandomLesseeSelector operator index {} parallelism {} max parallelism {} keygroup {} EXPLORE_RANGE {}", partition.getThisOperatorIndex(), partition.getParallelism(), partition.getMaxParallelism(), @@ -49,7 +49,8 @@ public RandomLesseeSelector(Partition partition, int range) { @Override public Address selectLessee(Address lessorAddress) { - int targetOperatorId = ((ThreadLocalRandom.current().nextInt()%(partition.getParallelism()-1) + (partition.getParallelism()-1))%(partition.getParallelism()-1) + partition.getThisOperatorIndex() + 1)%(partition.getParallelism()); + int targetOperatorId = ThreadLocalRandom.current().nextInt(0, partition.getParallelism()); + //int targetOperatorId = ((ThreadLocalRandom.current().nextInt()%(partition.getParallelism()-1) + (partition.getParallelism()-1))%(partition.getParallelism()-1) + partition.getThisOperatorIndex() + 1)%(partition.getParallelism()); System.out.println("targetOperatorId " + targetOperatorId + " partition.getThisOperatorIndex() " + partition.getThisOperatorIndex()); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); return new Address(lessorAddress.type(), String.valueOf(keyGroupId)); @@ -76,7 +77,7 @@ public Set
selectLessees(Address lessorAddress, int count) { public ArrayList
exploreLessee(Address address) { ArrayList
ret = new ArrayList<>(); for(int i = 0; i < this.EXPLORE_RANGE; i++){ - int targetOperatorIdIndex =(int)(ThreadLocalRandom.current().nextDouble() * (partition.getParallelism()-1)); + int targetOperatorIdIndex =(int)(ThreadLocalRandom.current().nextDouble() * (partition.getParallelism())); int targetOperatorId = this.targetIdList.get(targetOperatorIdIndex); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); ret.add(new Address(address.type(), String.valueOf(keyGroupId))); @@ -88,7 +89,7 @@ public ArrayList
exploreLessee(Address address) { public ArrayList
exploreLesseeWithBase(Address address) { ArrayList
ret = new ArrayList<>(); for(int i = 0; i < this.EXPLORE_RANGE; i++){ - int targetOperatorIdIndex =(int)(ThreadLocalRandom.current().nextDouble() * (partition.getParallelism()-1)); + int targetOperatorIdIndex =(int)(ThreadLocalRandom.current().nextDouble() * (partition.getParallelism())); int targetOperatorId = this.targetIdList.get(targetOperatorIdIndex); int keyGroupId = KeyGroupRangeAssignment.computeKeyGroupForOperatorIndex(partition.getMaxParallelism(), partition.getParallelism(), targetOperatorId); ret.add(new Address(address.type(), String.valueOf(keyGroupId))); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java index d5ecf27d2..05631494f 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/Message.java @@ -47,6 +47,7 @@ public enum MessageType{ SUGAR_PILL, BARRIER, FLUSH, + FLUSH_DEPENDENCY, STATE_SYNC, STATE_SYNC_REPLY, } @@ -103,7 +104,7 @@ public boolean isControlMessage(){ getMessageType() == Message.MessageType.UNSYNC || getMessageType() == MessageType.LESSEE_REGISTRATION || getMessageType() == MessageType.FLUSH || - getMessageType() == MessageType.FLUSH; + getMessageType() == MessageType.FLUSH_DEPENDENCY; }