Skip to content

Commit

Permalink
debug logs
Browse files Browse the repository at this point in the history
Signed-off-by: arun696 <[email protected]>
  • Loading branch information
arunjose696 committed Feb 27, 2023
1 parent d58d4df commit b619245
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
1 change: 1 addition & 0 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ def submit(task, *args, num_returns=1, **kwargs):

# dest_rank = RoundRobin.get_instance().schedule_rank()
dest_rank = choose_destination_rank(collected_data_ids)
print(dest_rank, collected_data_ids)
push_data_owners(dest_rank, collected_data_ids)
output_ids = object_store.generate_output_data_id(
dest_rank, garbage_collector, num_returns
Expand Down
10 changes: 7 additions & 3 deletions unidist/core/backends/mpi/core/controller/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ def choose_destination_rank(data_ids):
"""
Choose destination rank considering which worker has the maximum share of data.
Get the data shares in each worker process
Get the data shares in each worker process and choose rank with the most data share.
If data_ids are empty choses the rank using the round robin approach.
Parameters
----------
Expand All @@ -294,13 +295,16 @@ def choose_destination_rank(data_ids):
int
A rank number.
"""
if not data_ids:
chosen_rank = RoundRobin.get_instance().schedule_rank()
return chosen_rank
data_share = defaultdict(lambda: 0)
for data_id in data_ids:
data_share[object_store.get_data_owner(data_id)] += object_store.get_data_size(
data_id
)
rank_with_max_data = max(data_share, key=data_share.get)
return rank_with_max_data
chosen_rank = max(data_share, key=data_share.get)
return chosen_rank


def collect_all_data_id_from_args(value, collected_data_ids=[]):
Expand Down
3 changes: 2 additions & 1 deletion unidist/core/backends/mpi/core/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Worker MPI process task processing functionality."""
import asyncio
import sys
from functools import wraps, partial

try:
Expand All @@ -29,7 +30,7 @@
# When building documentation we do not have MPI initialized so
# we use the condition to set "worker_0.log" in order to build it succesfully.
log_file = "worker_{}.log".format(mpi_state.rank if mpi_state is not None else 0)
w_logger = common.get_logger("worker", log_file)
w_logger = common.get_logger("worker", sys.stdout, True)

# Actors map {handle : actor}
actor_map = {}
Expand Down

0 comments on commit b619245

Please sign in to comment.