Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
AymenFJA authored Nov 3, 2023
2 parents 9fe38da + f8b1836 commit ae63a57
Show file tree
Hide file tree
Showing 20 changed files with 53 additions and 60 deletions.
3 changes: 1 addition & 2 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
[flake8]
# E124: closing bracket does not match visual indentation
# E126: continuation line over-indented for hanging indent
# This one is bad. Sometimes ordering matters, conditional imports
# setting env vars necessary etc.
Expand All @@ -8,7 +7,7 @@
# https://github.com/PyCQA/pycodestyle/issues/386
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E124, E126, E402, E129, W504
ignore = E126, E402, E129, W504
max-line-length = 158
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
Expand Down
36 changes: 12 additions & 24 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,21 @@


class Channel(metaclass=ABCMeta):
"""For certain resources such as campus clusters or supercomputers at
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.
For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.
For instance some resources may allow access to their job schedulers from
only their login-nodes which require you to authenticate on through SSH,
GSI-SSH and sometimes even require two factor authentication. Channels are
simple abstractions that enable the ExecutionProvider component to talk to
the resource managers of compute facilities. The simplest Channel,
*LocalChannel*, simply executes commands locally on a shell, while the
*SshChannel* authenticates you to remote systems.
Channels are usually called via the execute_wait function.
For channels that execute remotely, a push_file function allows you to copy over files.
.. code:: python
+------------------
|
cmd, wtime ------->| execute_wait
(ec, stdout, stderr)<-|---+
|
src, dst_dir ------->| push_file
dst_path <--------|----+
|
dst_script_dir <------| script_dir
|
+-------------------
only their login-nodes which require you to authenticate through SSH, or
require two factor authentication.
The simplest Channel, *LocalChannel*, executes commands locally in a
shell, while the *SSHChannel* authenticates you to remote systems.
Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.
Channels should ensure that each launched command runs in a new process
group, so that providers (such as AdHocProvider and LocalProvider) which
Expand Down
4 changes: 2 additions & 2 deletions parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
}
}


config = Config(
Expand All @@ -26,7 +26,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ def _start_local_interchange_process(self):
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO
},
},
daemon=True,
name="HTEX-Interchange"
)
)
self.interchange_proc.start()
try:
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self,
logdir=".",
logging_level=logging.INFO,
poll_period=10,
) -> None:
) -> None:
"""
Parameters
----------
Expand Down Expand Up @@ -425,7 +425,7 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
self.current_platform['parsl_v']),
"py.v={} parsl.v={}".format(msg['python_v'].rsplit(".", 1)[0],
msg['parsl_v'])
)
)
result_package = {'type': 'result', 'task_id': -1, 'exception': serialize_object(e)}
pkl_package = pickle.dumps(result_package)
self.results_outgoing.send(pkl_package)
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def create_reg_message(self):
'dir': os.getcwd(),
'cpu_count': psutil.cpu_count(logical=False),
'total_memory': psutil.virtual_memory().total,
}
}
b_msg = json.dumps(msg).encode('utf-8')
return b_msg

Expand Down Expand Up @@ -608,7 +608,7 @@ def worker(worker_id, pool_id, pool_size, task_queue, result_queue, worker_queue
logger.exception("Caught exception while trying to pickle the result package")
pkl_package = pickle.dumps({'type': 'result', 'task_id': tid,
'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))
})
})

result_queue.put(pkl_package)
tasks_in_progress.pop(worker_id)
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/taskvine/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _taskvine_factory(should_stop, factory_config):
else:
factory = Factory(batch_type=factory_config.batch_type,
manager_host_port=f"{factory_config._project_address}:{factory_config._project_port}",
)
)
except Exception as e:
raise TaskVineFactoryFailure(f'Cannot create factory with exception {e}')

Expand Down
12 changes: 6 additions & 6 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,29 +194,29 @@ def start(self, run_id: str, run_dir: str) -> int:
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"run_id": run_id
},
},
name="Monitoring-Router-Process",
daemon=True,
)
)
self.router_proc.start()

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,),
kwargs={"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
},
},
name="Monitoring-DBM-Process",
daemon=True,
)
)
self.dbm_proc.start()
self.logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))

self.filesystem_proc = Process(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
)
self.filesystem_proc.start()
self.logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

Expand Down Expand Up @@ -359,7 +359,7 @@ def __init__(self,
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3 # in seconds
):
):
""" Initializes a monitoring configuration class.
Parameters
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def send_first_last_message(try_id: int,
'first_msg': not is_last,
'last_msg': is_last,
'timestamp': datetime.datetime.now()
})
})
radio.send(msg)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
'exec_done': 'rgb(0, 200, 0)',
'memo_done': 'rgb(64, 200, 64)',
'fail_retryable': 'rgb(200, 128,128)'
}
}


def task_gantt_plot(df_task, df_status, time_completed=None):
Expand Down Expand Up @@ -50,7 +50,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None):
'Start': last_status['timestamp'],
'Finish': status['timestamp'],
'Resource': last_status['task_status_name']
}
}
parsl_tasks.extend([last_status_bar])
last_status = status

Expand All @@ -60,7 +60,7 @@ def task_gantt_plot(df_task, df_status, time_completed=None):
'Start': last_status['timestamp'],
'Finish': time_completed,
'Resource': last_status['task_status_name']
}
}
parsl_tasks.extend([last_status_bar])

fig = ff.create_gantt(parsl_tasks,
Expand Down Expand Up @@ -205,7 +205,7 @@ def y_axis_setup(value):
"fail_retryable": (8, 'rgb(200, 128,128)'),
"joining": (9, 'rgb(128, 128, 255)'),
"running_ended": (10, 'rgb(64, 64, 255)')
}
}


def workflow_dag_plot(df_tasks, group_by_apps=True):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def worker_efficiency(task, node):
y=[total_workers] * (end - start + 1),
name='Total of workers in whole run',
)
],
],
layout=go.Layout(xaxis=dict(autorange=True,
title='Time (seconds)'),
yaxis=dict(title='Number of workers'),
Expand Down Expand Up @@ -230,7 +230,7 @@ def resource_efficiency(resource, node, label):
y=[total] * (end - start + 1),
name=name2,
)
],
],
layout=go.Layout(xaxis=dict(autorange=True,
title='Time (seconds)'),
yaxis=dict(title=yaxis),
Expand Down
6 changes: 3 additions & 3 deletions parsl/tests/configs/ad_hoc_cluster_htex.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
{'username': 'YOUR_USERNAME',
'script_dir': 'YOUR_SCRIPT_DIR',
'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2']
}
} # type: Dict[str, Dict[str, Any]]
}
} # type: Dict[str, Dict[str, Any]]

config = Config(
executors=[
Expand All @@ -25,7 +25,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/htex_ad_hoc_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
channels=[SSHChannel(hostname=m,
username=user_opts['adhoc']['username'],
script_dir=user_opts['adhoc']['script_dir'],
) for m in user_opts['adhoc']['remote_hostnames']]
) for m in user_opts['adhoc']['remote_hostnames']]
)
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/configs/local_threads_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
hub_port=55055,
resource_monitoring_interval=3,
)
)
)
8 changes: 6 additions & 2 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def setup_data(tmpd_cwd):


@pytest.fixture(autouse=True, scope='function')
def wait_for_task_completion(pytestconfig):
def assert_no_outstanding_tasks(pytestconfig):
"""If we're in a config-file based mode, wait for task completion between
each test. This will detect early on (by hanging) if particular test
tasks are not finishing, rather than silently falling off the end of
Expand All @@ -254,7 +254,11 @@ def wait_for_task_completion(pytestconfig):
config = pytestconfig.getoption('config')[0]
yield
if config != 'local':
parsl.dfk().wait_for_current_tasks()
logger.info("Checking no outstanding tasks")
for task_record in parsl.dfk().tasks.values():
fut = task_record['app_fu']
assert fut.done(), f"Incomplete task found, task id {task_record['id']}"
logger.info("No outstanding tasks found")


def pytest_make_collect_report(collector):
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/vineex_condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
config = Config(
executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=50055),
provider=CondorProvider(),
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/vineex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
executors=[TaskVineExecutor(label='VineExec',
worker_launch_method='factory',
manager_config=TaskVineManagerConfig(port=50055),
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/wqex_condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
provider=CondorProvider(),
# init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh;
# echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log',
)]
)]
)
2 changes: 1 addition & 1 deletion parsl/tests/scaling_tests/wqex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
provider=LocalProvider(),
# init_command='source /home/yadu/src/wq_parsl/setup_parsl_env.sh;
# echo "Ran at $date" > /home/yadu/src/wq_parsl/parsl/tests/workqueue_tests/ran.log',
)]
)]
)
4 changes: 3 additions & 1 deletion parsl/tests/test_python_apps/test_lifted.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ def test_returns_a_class_instance():

def test_returns_a_class_instance_no_underscores():
# test that _underscore attribute references are not lifted
f = returns_a_class_instance()
with pytest.raises(AttributeError):
returns_a_class_instance()._nosuchattribute.result()
f._nosuchattribute.result()
f.exception() # wait for f to complete before the test ends


def test_returns_a_class():
Expand Down

0 comments on commit ae63a57

Please sign in to comment.