Skip to content

Commit

Permalink
added check for pending get requests after put data
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose696 committed Mar 7, 2023
1 parent 7045e94 commit f4717d9
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 3 deletions.
3 changes: 2 additions & 1 deletion unidist/core/backends/mpi/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __del__(self):
def __getstate__(self):
"""Remove a reference to garbage collector for correct `pickle` serialization."""
attributes = self.__dict__.copy()
del attributes["_gc"]
if hasattr(self, "_gc"):
del attributes["_gc"]
return attributes

def base_data_id(self):
Expand Down
3 changes: 2 additions & 1 deletion unidist/core/backends/mpi/core/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

"""Worker MPI process task processing functionality."""
import asyncio
import sys
from functools import wraps, partial

try:
Expand Down Expand Up @@ -126,6 +125,8 @@ async def worker_loop():
# Check pending actor requests also.
task_store.check_pending_actor_tasks()

RequestStore.get_instance().check_pending_get_requests(request["id"])

elif operation_type == common.Operation.PUT_OWNER:
request = communication.recv_simple_operation(mpi_state.comm, source_rank)
object_store.put_data_owner(request["id"], request["owner"])
Expand Down
5 changes: 4 additions & 1 deletion unidist/core/backends/mpi/core/worker/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def unwrap_local_data_id(self, arg):
self.request_worker_data(owner_rank, arg)
return arg, True
else:
raise ValueError("DataID is missing!")
raise ValueError("DataID is missing! {}".format(arg))
else:
return arg, False

Expand Down Expand Up @@ -357,6 +357,9 @@ def process_task_request(self, request):
w_logger.debug("Is pending - {}".format(is_pending))

if is_pending or is_kw_pending:
current_rank = communication.MPIState.get_instance().rank
for output_id in output_ids:
ObjectStore.get_instance().put_data_owner(output_id, current_rank)
request["args"] = args
request["kwargs"] = kwargs
return request
Expand Down

0 comments on commit f4717d9

Please sign in to comment.