Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix judger tags #60

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 6 additions & 31 deletions coordinator/Services/JudgerCoordinatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,7 @@ await judger.Socket.SendMessage(new MultipleNewJobServerMsg() {

static readonly TimeSpan DISPATH_TIMEOUT = TimeSpan.FromMinutes(30);

protected async Task<Job?> GetLastUndispatchedJobFromDatabase(RurikawaDb db, List<string>? tags, bool allowUntagged) {
IQueryable<Job> res = QueuedCriteria(db.Jobs);
protected IQueryable<Job> ApplyTagCriteria(IQueryable<Job> res, RurikawaDb db, List<string>? tags, bool allowUntagged) {
if (tags != null) {
// join the test suites for suite tags
var query = res.Join(db.TestSuites, j => j.TestSuite, t => t.Id, (job, suite) => new { job, suite });
Expand All @@ -467,12 +466,12 @@ await judger.Socket.SendMessage(new MultipleNewJobServerMsg() {
// but still return a job
res = query.Select(x => x.job);
}
return await res
.OrderBy(j => j.Id).FirstOrDefaultAsync();
return res
;
}

protected async Task<List<Job>> GetUndispatchedJobsFromDatabase(RurikawaDb db, int count) {
var res = await QueuedCriteria(db.Jobs)
protected async Task<List<Job>> GetUndispatchedJobsFromDatabase(RurikawaDb db, int count, List<string>? tags, bool allowUntagged) {
var res = await ApplyTagCriteria(QueuedCriteria(db.Jobs), db, tags, allowUntagged)
.OrderBy(j => j.Id)
.Take(count)
.ToListAsync();
Expand All @@ -499,30 +498,6 @@ public static IQueryable<Job> QueuedCriteria(IQueryable<Job> jobs) {
);
}

/// <summary>
/// Dispatch ONE job from database.
/// </summary>
/// <param name="judger">The judger to dispatch from</param>
/// <returns></returns>
protected async ValueTask<bool> TryDispatchJobFromDatabase(Judger judger) {
using var scope = scopeProvider.CreateScope();
var db = GetDb(scope);
using var tx = await db.Database.BeginTransactionAsync(System.Data.IsolationLevel.Serializable);
var job = await GetLastUndispatchedJobFromDatabase(
db, judger.DbJudgerEntry.Tags, judger.DbJudgerEntry.AcceptUntaggedJobs);
if (job == null) return false;

try {
var res = await DispatchJob(judger, job);
await db.SaveChangesAsync();
await tx.CommitAsync();
return res;
} catch {
await tx.RollbackAsync();
return false;
}
}

/// <summary>
/// Dispatch MANY job from database. This method should be favored over
/// <c>TryDispatchJobFromDatabase(Judger)</c>
Expand All @@ -538,7 +513,7 @@ protected async ValueTask<int> TryDispatchJobFromDatabase(
using var scope = scopeProvider.CreateScope();
var db = GetDb(scope);
using var tx = await db.Database.BeginTransactionAsync(System.Data.IsolationLevel.Serializable);
var jobs = await GetUndispatchedJobsFromDatabase(db, count);
var jobs = await GetUndispatchedJobsFromDatabase(db, count, judger.DbJudgerEntry.Tags, judger.DbJudgerEntry.AcceptUntaggedJobs);

try {
var res = await DispatchJobs(judger, jobs, replyTo);
Expand Down
5 changes: 5 additions & 0 deletions judger/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,11 @@ async fn poll_jobs(
let request_for_new_task =
client_config.cfg().max_concurrent_tasks as u32 - active_task_count;

if request_for_new_task == 0 {
tracing::debug!("No new job is needed.");
continue;
}

tracing::debug!(
"Polling jobs from server. Asking for {} new jobs.",
request_for_new_task
Expand Down
11 changes: 9 additions & 2 deletions judger/src/tester/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ impl Image {
// Freeze `path` as a tar archive.
Some(hyper::Body::wrap_stream(tar_stream)),
)
.map_err(|e| BuildError::Internal(e.to_string()))
.map_err(|e| {
BuildError::Internal(format!("Internal error when building image: {:?}", e))
})
.try_for_each(|info| async {
if let Some(e) = info.error {
return Err(BuildError::BuildError {
Expand All @@ -428,7 +430,12 @@ impl Image {

archiving
.await
.map_err(|e| BuildError::Internal(e.to_string()))?
.map_err(|e| {
BuildError::Internal(format!(
"Internal error when joining file transfer task: {}",
e
))
})?
.map_err(|e: io::Error| BuildError::FileTransferError(e.to_string()))?;

Ok(())
Expand Down