diff --git a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py index d89df9f1c..4f4126136 100644 --- a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py +++ b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py @@ -84,34 +84,32 @@ def run(self): else: self.__logger.info(f"Received some other datatype: {type(item)}") - # If no work available, request a steal from a random rank + # If no work is available, try to request a steal from a random rank + elif not self.pending_steal_request: + target_rank_id = random.randrange(0, self.algorithm.num_ranks) + requesting_rank = self.rank + target_rank = self.algorithm.ranks[target_rank_id] + if self.algorithm.has_work(target_rank): + steal_request = StealRequest(requesting_rank, target_rank) + # Place steal request in target's queue + self.algorithm.rank_queues[target_rank_id].appendleft(steal_request) + self.pending_steal_request = True + yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here + else: + # this rank is awaiting the fulfillment of a steal request + # and can not proceed until it gets a response + pass - if not self.pending_steal_request: - target_rank_id = random.randrange(0, self.algorithm.num_ranks) - requesting_rank = self.rank - target_rank = self.algorithm.ranks[target_rank_id] - if self.algorithm.has_work(target_rank): - steal_request = StealRequest(requesting_rank, target_rank) - # Place steal request in target's queue - self.algorithm.rank_queues[target_rank_id].appendleft(steal_request) - yield self.env.timeout(self.algorithm.steal_time) - else: - # this rank is awaiting the fulfillment of a steal request - # and can not proceed until it gets a response - pass + # print(self.rank_id) def __get_total_work(self): """Returns the total work on the rank.""" total_work = 0. for item in self.algorithm.rank_queues[self.rank_id]: - if isinstance(item, Object): - total_work += item.get_load() - elif isinstance(item, list): + if isinstance(item, list): for o in item: total_work += o.get_load() - else: - pass return total_work def __has_work(self): @@ -229,15 +227,11 @@ def __reset(self): def __has_stealable_cluster(self, rank): """Asserts that a given rank has a stealable cluster.""" stealable = False - # Make sure rank is not empty - if self.has_work(rank): - - # Make sure the last item in the queue is a cluster - if isinstance(self.rank_queues[rank.get_id()][-1], list): + rank_queue = self.rank_queues[rank.get_id()] - # Make sure that the rank will not then be empty (prevent stealing back and forth) - if sum(isinstance(elm, list) for elm in self.rank_queues[rank.get_id()]) > 1: - stealable = True + # Make sure rank has at least two clusters in its queue (requiring two clusters prevents passing the last cluster around forever) + if self.has_work(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1: + stealable = True return stealable @@ -256,7 +250,7 @@ def respond_to_steal_request(self, steal_request: StealRequest): # Perform steal cluster = self.rank_queues[r_target.get_id()].pop() - print(f"Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})") + self.__logger.info(f" Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})") self.rank_queues[r_requesting.get_id()].append(cluster) self.__steal_count += 1