Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

follower do not need to retrieve load data in zk. #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -173,7 +173,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private PulsarService pulsar;

// Executor service used to update broker data.
private final ExecutorService executors;
private final ScheduledExecutorService scheduler;

// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
Expand All @@ -198,6 +198,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private final Lock lock = new ReentrantLock();
private final Set<String> knownBrokers = new HashSet<>();
private Map<String, String> bundleBrokerAffinityMap;
// flag to indicate whether the load data is old.
private volatile boolean oldFlag = false;

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand All @@ -213,7 +215,7 @@ public ModularLoadManagerImpl() {
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
preallocatedBundleToBroker = new ConcurrentHashMap<>();
executors = Executors.newSingleThreadExecutor(
scheduler = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -275,24 +277,32 @@ public void initialize(final PulsarService pulsar) {
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
executors.execute(
scheduler.execute(
() -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap));
});

loadSheddingPipeline.add(createLoadSheddingStrategy());
}

private boolean isLeader() {
return pulsar.getLeaderElectionService() != null
&& pulsar.getLeaderElectionService().isLeader();
}

public void handleDataNotification(Notification t) {
if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
.thenAccept(brokers -> {
reapDeadBrokerPreallocations(brokers);
});
oldFlag = true;

try {
executors.execute(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
if (isLeader()) {
try {
scheduler.submit(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
}
}
}
Expand Down Expand Up @@ -479,6 +489,7 @@ public void updateAll() {
updateBundleData();
// broker has latest load-report: check if any bundle requires split
checkNamespaceBundleSplit();
oldFlag = false;
}

private synchronized void cleanupDeadBrokersData() {
Expand Down Expand Up @@ -834,13 +845,28 @@ private void updateBundleSplitMetrics(int bundlesSplit) {

/**
* As the leader broker, find a suitable broker for the assignment of the given bundle.
* If leader broker is inactive, decision will be made by the current broker.
*
* @param serviceUnit
* ServiceUnitId for the bundle.
* @return The name of the selected broker, as it appears on metadata store.
*/
@Override
public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
if (!isLeader() && oldFlag) {
log.info("Make decision on follower broker, but the LoadData is old," +
" need to update LoadData before make decision");
updateAll();
// invalidate all cache after some times in case of non-leader broker requesting
// metadata store continuously.
scheduler.schedule(() -> {
bundlesCache.invalidateAll();
brokersData.releaseAllResourcesInCache();
oldFlag = true;
}, Math.min(conf.getLoadBalancerResourceQuotaUpdateIntervalMinutes(),
conf.getLoadBalancerReportUpdateMaxIntervalMinutes()), TimeUnit.MINUTES);
}

// Use brokerCandidateCache as a lock to reduce synchronization.
long startTime = System.nanoTime();

Expand Down Expand Up @@ -1004,7 +1030,6 @@ public void start() throws PulsarServerException {

timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
__ -> new TimeAverageBrokerData()).join();
updateAll();
} catch (Exception e) {
log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e);
throw new PulsarServerException(e);
Expand All @@ -1019,7 +1044,7 @@ public void start() throws PulsarServerException {
*/
@Override
public void stop() throws PulsarServerException {
executors.shutdownNow();
scheduler.shutdownNow();

try {
brokersData.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,9 @@ public interface MetadataCache<T> {
* @param path the path of the object in the metadata store
*/
void refresh(String path);

/**
* Force the invalidation of all objects in the metadata cache.
*/
void invalidateAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ public interface LockManager<T> extends AutoCloseable {
*/
CompletableFuture<Void> asyncClose();

/**
* release all resources in cache.
* @return
*/
void releaseAllResourcesInCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,9 @@ public CompletableFuture<Void> asyncClose() {
.map(ResourceLock::release)
.collect(Collectors.toList()));
}

@Override
public void releaseAllResourcesInCache() {
cache.invalidateAll();
}
}