From 37eb0787386500451ff945f364c2f4106233748b Mon Sep 17 00:00:00 2001 From: Jacob Callahan Date: Fri, 20 Sep 2024 16:19:20 -0400 Subject: [PATCH] Respect the thread_limit setting included in the previous commit Due to timing, I couldn't cleanly include this in the previous commit, but here it is. Now Broker actions, including checkins will reference this new setting when determining how many threads it should use. --- broker/broker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/broker/broker.py b/broker/broker.py index e94e280..f499c9d 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -24,6 +24,7 @@ from broker import exceptions, helpers from broker.hosts import Host from broker.providers import PROVIDER_ACTIONS, PROVIDERS, _provider_imports +from broker.settings import settings # load all the provider class so they are registered for _import in _provider_imports: @@ -86,7 +87,8 @@ def _act(self, provider, method, checkout=False): method_obj = getattr(provider_inst, method) logger.debug(f"On {provider_inst=} executing {method_obj=} with params {self._kwargs=}.") # Overkill for a single action, cleaner than splitting the logic - with ThreadPoolExecutor() as workers: + max_workers = min(count, int(settings.thread_limit)) if settings.thread_limit else None + with ThreadPoolExecutor(max_workers=max_workers) as workers: tasks = [workers.submit(method_obj, **self._kwargs) for _ in range(count)] result = [] for task in as_completed(tasks): @@ -202,8 +204,8 @@ def checkin(self, sequential=False, host=None, in_context=False): if not hosts: logger.debug("Checkin called with no hosts, taking no action") return - - with ThreadPoolExecutor(max_workers=1 if sequential else None) as workers: + max_workers = min(len(hosts), int(settings.thread_limit)) if settings.thread_limit else None + with ThreadPoolExecutor(max_workers=1 if sequential else max_workers) as workers: completed_checkins = as_completed( # reversing over a copy of the list to avoid skipping workers.submit(self._checkin, _host)