Skip to content

Commit

Permalink
Parallelize consequences-v3.10.0.py calculations
Browse files Browse the repository at this point in the history
Use Python multiprocessing package to take advantage of multiple CPU cores
for processing multiple realizations simultaneously.

This would reduce the total run time of, for example,

    bash scripts/run_OQStandard.sh SCM5p8_Montreal_conv -h -r -d -o

from 23 hours down to 6 hours on a c5a.24xlarge EC2 instance.

Fixes OpenDRR#57
  • Loading branch information
anthonyfok committed May 5, 2022
1 parent 8b35ed2 commit f5ca3a4
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions scripts/consequences-v3.10.0.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import csv
import sys
import uuid
from multiprocessing import Pool, RLock, freeze_support

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -124,6 +126,17 @@ def read_interruption_time(xlsx):
}


# @globalize allows multiprocessing with nested functions.
# Resolves "AttributeError: Can't pickle local object 'calculate_consequences.<locals>.process_realization'"
# Credit: https://gist.github.com/EdwinChan/3c13d3a746bb3ec5082f
def globalize(func):
def result(*args, **kwargs):
return func(*args, **kwargs)
result.__name__ = result.__qualname__ = uuid.uuid4().hex
setattr(sys.modules[result.__module__], result.__name__, result)
return result


def calculate_consequences(job_id='-1'):
calc_id = datastore.get_last_calc_id() if job_id == '-1' else int(job_id)
dstore = datastore.read(calc_id)
Expand Down Expand Up @@ -166,7 +179,8 @@ def calculate_consequences(job_id='-1'):
casualties_night = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0}
casualties_transit = {"Severity 1": 0, "Severity 2": 0, "Severity 3": 0, "Severity 4": 0}

for rlzi in range(num_rlzs):
@globalize
def process_realization(rlzi):
print("Processing realization {} of {}".format(rlzi+1, num_rlzs))
filename = "consequences-rlz-" + str(rlzi).zfill(3) + "_" + str(calc_id) + ".csv"
with open(filename, 'w') as f:
Expand All @@ -188,7 +202,7 @@ def calculate_consequences(job_id='-1'):
"sc_BusDispl30", "sc_BusDispl90", "sc_BusDispl180", "sc_BusDispl360",
"debris_brick_wood_tons", "debris_concrete_steel_tons"])

for asset in tqdm(assetcol):
for asset in tqdm(assetcol, desc=str.rjust(rlzi+1), position=rlzi, mininterval=1):
asset_ref = asset['id'].decode()
asset_occ, asset_typ, code_level = taxonomies[asset['taxonomy']].split('-')
if calculation_mode == 'scenario_damage':
Expand Down Expand Up @@ -312,6 +326,14 @@ def calculate_consequences(job_id='-1'):
"{0:,.1f}".format(debris_concrete_steel),
])

# Set up parallel processing for realizations
# Credit: https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
freeze_support() # for Windows support
L = list(range(num_rlzs))
tqdm.set_lock(RLock())
p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),))
p.map(process_realization, L)


if __name__ == "__main__":
calculate_consequences(sys.argv[1])

0 comments on commit f5ca3a4

Please sign in to comment.