Skip to content

Commit

Permalink
#501: simplify code and add step modifying pending_steal bool
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 23, 2024
1 parent 045e1f3 commit 1425b5a
Showing 1 changed file with 22 additions and 28 deletions.
50 changes: 22 additions & 28 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,34 +84,32 @@ def run(self):
else:
self.__logger.info(f"Received some other datatype: {type(item)}")

# If no work available, request a steal from a random rank
# If no work is available, try to request a steal from a random rank
elif not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_work(target_rank):
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
self.pending_steal_request = True
yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here

else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
pass

if not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_work(target_rank):
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
yield self.env.timeout(self.algorithm.steal_time)
else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
pass
# print(self.rank_id)

def __get_total_work(self):
"""Returns the total work on the rank."""
total_work = 0.
for item in self.algorithm.rank_queues[self.rank_id]:
if isinstance(item, Object):
total_work += item.get_load()
elif isinstance(item, list):
if isinstance(item, list):
for o in item:
total_work += o.get_load()
else:
pass
return total_work

def __has_work(self):
Expand Down Expand Up @@ -229,15 +227,11 @@ def __reset(self):
def __has_stealable_cluster(self, rank):
"""Asserts that a given rank has a stealable cluster."""
stealable = False
# Make sure rank is not empty
if self.has_work(rank):

# Make sure the last item in the queue is a cluster
if isinstance(self.rank_queues[rank.get_id()][-1], list):
rank_queue = self.rank_queues[rank.get_id()]

# Make sure that the rank will not then be empty (prevent stealing back and forth)
if sum(isinstance(elm, list) for elm in self.rank_queues[rank.get_id()]) > 1:
stealable = True
# Make sure rank has at least two clusters in its queue (requiring two clusters prevents passing the last cluster around forever)
if self.has_work(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1:
stealable = True

return stealable

Expand All @@ -256,7 +250,7 @@ def respond_to_steal_request(self, steal_request: StealRequest):

# Perform steal
cluster = self.rank_queues[r_target.get_id()].pop()
print(f"Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})")
self.__logger.info(f" Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})")
self.rank_queues[r_requesting.get_id()].append(cluster)
self.__steal_count += 1

Expand Down

0 comments on commit 1425b5a

Please sign in to comment.