Skip to content

Commit

Permalink
updating of available resources in drones, closes #82
Browse files Browse the repository at this point in the history
  • Loading branch information
eileen-kuehn committed Feb 12, 2020
1 parent ee779a4 commit a2bf316
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions lapis/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ def access_wrapped(name, requested=True):
return access_wrapped("cores", requested=False)
elif "memory" in item:
try:
return self._temp["memory"]
return (1 / 1000 / 1000) * self._temp["memory"]
except KeyError:
return (1 / 1000 / 1000) * access_wrapped("memory", requested=False)
elif "disk" in item:
try:
return self._temp["disk"]
return (1 / 1024) * self._temp["disk"]
except KeyError:
return (1 / 1024) * access_wrapped("disk", requested=False)
return super(WrappedClassAd, self).__getitem__(item)
Expand Down Expand Up @@ -490,15 +490,22 @@ async def _schedule_jobs(self):
continue
else:
matches.append((queue_index, candidate_job, matched_drone))
# TODO: deduct job-resources from matched drone
# and update instead of remove
pre_job_drones.remove(matched_drone)
for key, value in candidate_job._wrapped.resources.items():
matched_drone._temp[key] = (
matched_drone._temp.get(
key,
matched_drone._wrapped.theoretical_available_resources[key],
)
- value
)
pre_job_drones.update(matched_drone)
if not matches:
return
# TODO: optimize for few matches, many matches, all matches
for queue_index, _, _ in reversed(matches):
del self.job_queue[queue_index]
for _, job, drone in matches:
drone.clear_temporary_resources()
await self._execute_job(job=job, drone=drone)
await sampling_required.put(self)
# NOTE: Is this correct? Triggers once instead of for each job
Expand Down

0 comments on commit a2bf316

Please sign in to comment.