From f5ca3a4c9453f37c9832b889a05ff4f7caea1c7b Mon Sep 17 00:00:00 2001 From: Anthony Fok Date: Thu, 5 May 2022 00:57:45 -0600 Subject: [PATCH] Parallelize consequences-v3.10.0.py calculations Use Python multiprocessing package to take advantage of multiple CPU cores for processing multiple realizations simultaneously. This would reduce the total run time of, for example, bash scripts/run_OQStandard.sh SCM5p8_Montreal_conv -h -r -d -o from 23 hours down to 6 hours on a c5a.24xlarge EC2 instance. Fixes #57 --- scripts/consequences-v3.10.0.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/scripts/consequences-v3.10.0.py b/scripts/consequences-v3.10.0.py index ac12fc6..b683e95 100644 --- a/scripts/consequences-v3.10.0.py +++ b/scripts/consequences-v3.10.0.py @@ -18,6 +18,8 @@ import csv import sys +import uuid +from multiprocessing import Pool, RLock, freeze_support import numpy as np import pandas as pd @@ -124,6 +126,17 @@ def read_interruption_time(xlsx): } +# @globalize allows multiprocessing with nested functions. +# Resolves "AttributeError: Can't pickle local object 'calculate_consequences..process_realization'" +# Credit: https://gist.github.com/EdwinChan/3c13d3a746bb3ec5082f +def globalize(func): + def result(*args, **kwargs): + return func(*args, **kwargs) + result.__name__ = result.__qualname__ = uuid.uuid4().hex + setattr(sys.modules[result.__module__], result.__name__, result) + return result + + def calculate_consequences(job_id='-1'): calc_id = datastore.get_last_calc_id() if job_id == '-1' else int(job_id) dstore = datastore.read(calc_id) @@ -166,7 +179,8 @@ def calculate_consequences(job_id='-1'): casualties_night = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0} casualties_transit = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0} - for rlzi in range(num_rlzs): + @globalize + def process_realization(rlzi): print("Processing realization {} of {}".format(rlzi+1, num_rlzs)) filename = "consequences-rlz-" + str(rlzi).zfill(3) + "_" + str(calc_id) + ".csv" with open(filename, 'w') as f: @@ -188,7 +202,7 @@ def calculate_consequences(job_id='-1'): "sc_BusDispl30", "sc_BusDispl90", "sc_BusDispl180", "sc_BusDispl360", "debris_brick_wood_tons", "debris_concrete_steel_tons"]) - for asset in tqdm(assetcol): + for asset in tqdm(assetcol, desc=str.rjust(rlzi+1), position=rlzi, mininterval=1): asset_ref = asset['id'].decode() asset_occ, asset_typ, code_level = taxonomies[asset['taxonomy']].split('-') if calculation_mode == 'scenario_damage': @@ -312,6 +326,14 @@ def calculate_consequences(job_id='-1'): "{0:,.1f}".format(debris_concrete_steel), ]) + # Set up parallel processing for realizations + # Credit: https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py + freeze_support() # for Windows support + L = list(range(num_rlzs)) + tqdm.set_lock(RLock()) + p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) + p.map(process_realization, L) + if __name__ == "__main__": calculate_consequences(sys.argv[1])