Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy of hmittal/launched #611

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,6 @@ public <T> Observable<SinkClient<T>> getSinkClientByJobName(final String jobName
final AtomicReference<String> lastJobIdRef = new AtomicReference<>();
return clientWrapper.getNamedJobsIds(jobName)
.doOnUnsubscribe(() -> lastJobIdRef.set(null))
// .lift(new Observable.Operator<String, String>() {
// @Override
// public Subscriber<? super String> call(Subscriber<? super String> subscriber) {
// subscriber.add(Subscriptions.create(new Action0() {
// @Override
// public void call() {
// lastJobIdRef.set(null);
// }
// }));
// return subscriber;
// }
// })
.filter((String newJobId) -> {
logger.info("Got job cluster's new jobId=" + newJobId);
return newJobIdIsGreater(lastJobIdRef.get(), newJobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,7 @@ public Observable<NamedJobInfo> namedJobInfo(final String jobName) {
.timeout(3 * MASTER_SCHED_INFO_HEARTBEAT_INTERVAL_SECS,
TimeUnit.SECONDS)
.filter(namedJobInfo -> namedJobInfo != null
&& !JobSchedulingInfo.HB_JobId.equals(namedJobInfo.getName()))
;
&& !JobSchedulingInfo.HB_JobId.equals(namedJobInfo.getName()));
}))
.repeatWhen(repeatLogic)
.retryWhen(retryLogic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public interface IJobClustersManager {
// worker related messages

void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request);
void onGetLastLaunchedJobIdSubject(JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request);

void onWorkerEvent(WorkerEvent r);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetJobDetailsRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.GetLastLaunchedJobIdStreamResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.ResubmitWorkerRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterArtifactRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.UpdateJobClusterLabelsRequest;
Expand Down Expand Up @@ -234,6 +236,7 @@ private Receive getInitializedBehavior() {
.match(GetJobClusterRequest.class, this::onJobClusterGet)
.match(ListCompletedJobsInClusterRequest.class, this::onJobListCompleted)
.match(GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject)
.match(GetLastLaunchedJobIdStreamRequest.class, this::onGetLastLaunchedJobIdSubject)
.match(ListArchivedWorkersRequest.class, this::onListArchivedWorkers)
// List Job Cluster related messages
.match(ListJobClustersRequest.class, this::onJobClustersList)
Expand Down Expand Up @@ -291,6 +294,7 @@ private Receive getInitializingBehavior() {
.match(GetJobClusterRequest.class, (x) -> getSender().tell(new GetJobClusterResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(ListCompletedJobsInClusterRequest.class, (x) -> logger.warn(genUnexpectedMsg(x.toString(), state)))
.match(GetLastSubmittedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastSubmittedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(GetLastLaunchedJobIdStreamRequest.class, (x) -> getSender().tell(new GetLastLaunchedJobIdStreamResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), empty()), getSelf()))
.match(ListArchivedWorkersRequest.class, (x) -> getSender().tell(new ListArchivedWorkersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
.match(ListJobClustersRequest.class, (x) -> getSender().tell(new ListJobClustersResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
.match(ListJobsRequest.class, (x) -> getSender().tell(new ListJobsResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), Lists.newArrayList()), getSelf()))
Expand Down Expand Up @@ -522,6 +526,18 @@ public void onGetLastSubmittedJobIdSubject(GetLastSubmittedJobIdStreamRequest r)
sender.tell(new GetLastSubmittedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf());
}
}

@Override
public void onGetLastLaunchedJobIdSubject(GetLastLaunchedJobIdStreamRequest r) {
Optional<JobClusterInfo> jobClusterInfo = jobClusterInfoManager.getJobClusterInfo(r.getClusterName());
ActorRef sender = getSender();
if (jobClusterInfo.isPresent()) {
jobClusterInfo.get().jobClusterActor.forward(r, getContext());
} else {
sender.tell(new GetLastLaunchedJobIdStreamResponse(r.requestId, CLIENT_ERROR_NOT_FOUND, "No such Job cluster " + r.getClusterName(), empty()), getSelf());
}
}

@Override
public void onWorkerEvent(WorkerEvent workerEvent) {
if(logger.isDebugEnabled()) { logger.debug("Entering JobClusterManagerActor:onWorkerEvent {}", workerEvent); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.master.ILeadershipManager;
import io.mantisrx.server.master.LeaderRedirectionFilter;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.persistence.IMantisPersistenceProvider;
import io.mantisrx.server.master.resourcecluster.ResourceClusters;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class MasterApiAkkaService extends BaseService {
private final Materializer materializer;
private final ExecutorService executorService;
private final CountDownLatch serviceLatch = new CountDownLatch(1);
private final MasterConfiguration masterConfig;

public MasterApiAkkaService(final MasterMonitor masterMonitor,
final MasterDescription masterDescription,
Expand All @@ -105,7 +107,8 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
final IMantisPersistenceProvider mantisStorageProvider,
final LifecycleEventPublisher lifecycleEventPublisher,
final ILeadershipManager leadershipManager,
final AgentClusterOperations agentClusterOperations) {
final AgentClusterOperations agentClusterOperations,
final MasterConfiguration masterConfig) {
super(true);
Preconditions.checkNotNull(masterMonitor, "MasterMonitor");
Preconditions.checkNotNull(masterDescription, "masterDescription");
Expand All @@ -114,6 +117,7 @@ public MasterApiAkkaService(final MasterMonitor masterMonitor,
Preconditions.checkNotNull(mantisStorageProvider, "mantisStorageProvider");
Preconditions.checkNotNull(lifecycleEventPublisher, "lifecycleEventPublisher");
Preconditions.checkNotNull(leadershipManager, "leadershipManager");
this.masterConfig = masterConfig;
this.masterMonitor = masterMonitor;
this.masterDescription = masterDescription;
this.jobClustersManagerActor = jobClustersManagerActor;
Expand Down Expand Up @@ -155,7 +159,7 @@ private MantisMasterRoute configureApiRoutes(final ActorSystem actorSystem, fina
final JobStatusRouteHandler jobStatusRouteHandler = new JobStatusRouteHandlerAkkaImpl(actorSystem, statusEventBrokerActor);
final JobDiscoveryRouteHandler jobDiscoveryRouteHandler = new JobDiscoveryRouteHandlerAkkaImpl(jobClustersManagerActor, idleTimeout);

final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(jobDiscoveryRouteHandler);
final JobDiscoveryRoute v0JobDiscoveryRoute = new JobDiscoveryRoute(masterConfig.getNamedJobsReferToLaunched(), jobDiscoveryRouteHandler);
final JobClusterRoute v0JobClusterRoute = new JobClusterRoute(jobClusterRouteHandler, jobRouteHandler, actorSystem);
final AgentClusterRoute v0AgentClusterRoute = new AgentClusterRoute(agentClusterOperations, actorSystem);
final JobStatusRoute v0JobStatusRoute = new JobStatusRoute(jobStatusRouteHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,104 +54,90 @@ public JobClusterRouteHandlerAkkaImpl(ActorRef jobClusterManagerActor) {

@Override
public CompletionStage<JobClusterManagerProto.CreateJobClusterResponse> create(final JobClusterManagerProto.CreateJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.CreateJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.CreateJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.UpdateJobClusterResponse> update(JobClusterManagerProto.UpdateJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.UpdateJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.UpdateJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.DeleteJobClusterResponse> delete(JobClusterManagerProto.DeleteJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.DeleteJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.DeleteJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.DisableJobClusterResponse> disable(JobClusterManagerProto.DisableJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.DisableJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.DisableJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.EnableJobClusterResponse> enable(JobClusterManagerProto.EnableJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.EnableJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.EnableJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.UpdateJobClusterArtifactResponse> updateArtifact(JobClusterManagerProto.UpdateJobClusterArtifactRequest request) {
CompletionStage<JobClusterManagerProto.UpdateJobClusterArtifactResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.UpdateJobClusterArtifactResponse.class::cast);
return response;
}

@Override
public CompletionStage<UpdateSchedulingInfoResponse> updateSchedulingInfo(String clusterName, UpdateSchedulingInfoRequest request) {
CompletionStage<UpdateSchedulingInfoResponse> response =
ask(
jobClustersManagerActor,
new UpdateSchedulingInfo(request.requestId, clusterName, request.getSchedulingInfo(),
request.getVersion()),
timeout)
.thenApply(UpdateSchedulingInfoResponse.class::cast);
return response;
return ask(
jobClustersManagerActor,
new UpdateSchedulingInfo(request.requestId, clusterName, request.getSchedulingInfo(),
request.getVersion()),
timeout)
.thenApply(UpdateSchedulingInfoResponse.class::cast);
}

@Override
public CompletionStage<JobClusterManagerProto.UpdateJobClusterSLAResponse> updateSLA(JobClusterManagerProto.UpdateJobClusterSLARequest request) {
CompletionStage<JobClusterManagerProto.UpdateJobClusterSLAResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.UpdateJobClusterSLAResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse> updateWorkerMigrateStrategy(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest request) {
CompletionStage<JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.UpdateJobClusterLabelsResponse> updateLabels(JobClusterManagerProto.UpdateJobClusterLabelsRequest request) {
CompletionStage<JobClusterManagerProto.UpdateJobClusterLabelsResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.UpdateJobClusterLabelsResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.SubmitJobResponse> submit(JobClusterManagerProto.SubmitJobRequest request) {
CompletionStage<JobClusterManagerProto.SubmitJobResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.SubmitJobResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.GetJobClusterResponse> getJobClusterDetails(JobClusterManagerProto.GetJobClusterRequest request) {
CompletionStage<JobClusterManagerProto.GetJobClusterResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.GetJobClusterResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.ListJobClustersResponse> getAllJobClusters(JobClusterManagerProto.ListJobClustersRequest request) {
allJobClustersGET.increment();
CompletionStage<JobClusterManagerProto.ListJobClustersResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.ListJobClustersResponse.class::cast);
return response;
}

@Override
public CompletionStage<JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse> getLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest request) {
CompletionStage<JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse> response = ask(jobClustersManagerActor, request, timeout)
return ask(jobClustersManagerActor, request, timeout)
.thenApply(JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse.class::cast);
return response; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ CompletionStage<SchedInfoResponse> schedulingInfoStream(final JobClusterManagerP
final boolean sendHeartbeats);
CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastSubmittedJobIdStream(final JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest request,
final boolean sendHeartbeats);

CompletionStage<JobDiscoveryRouteProto.JobClusterInfoResponse> lastLaunchedJobIdStream(final JobClusterManagerProto.GetLastLaunchedJobIdStreamRequest request,
final boolean sendHeartbeats);
}
Loading
Loading