From 46e1f5f5c7f391b4fd044ef818c5b306e39b4928 Mon Sep 17 00:00:00 2001 From: Tasos Papaioannou Date: Wed, 18 Sep 2024 22:58:31 -0400 Subject: [PATCH] Debug PRT --- broker/broker.py | 17 +++++++++-------- broker/providers/__init__.py | 31 ++++++++++++++++++++++--------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/broker/broker.py b/broker/broker.py index af9598e3..390910bd 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -17,7 +17,7 @@ This module (or parent directory) should be used as the main entry point for the Broker API. """ -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, wait from contextlib import contextmanager from logzero import logger @@ -60,22 +60,23 @@ def __get__(self, instance, owner): if not instance: return self.func - def mp_split(*args, **kwargs): + def execute(*args, **kwargs): count = instance._kwargs.get("_count", None) if count is None: return self.func(instance, *args, **kwargs) results = [] max_workers_count = self.MAX_WORKERS or count - with self.EXECUTOR(max_workers=max_workers_count) as workers: - completed_futures = as_completed( - workers.submit(self.func, instance, *args, **kwargs) for _ in range(count) - ) - for f in completed_futures: + with self.EXECUTOR(max_workers=max_workers_count) as executor: + futures = [ + executor.submit(self.func, instance, *args, **kwargs) for _ in range(count) + ] + wait(futures) + for f in futures: results.extend(f.result()) return results - return mp_split + return execute class Broker: diff --git a/broker/providers/__init__.py b/broker/providers/__init__.py index 5f3ad5de..0c8581df 100644 --- a/broker/providers/__init__.py +++ b/broker/providers/__init__.py @@ -65,18 +65,22 @@ def __new__(cls, name, bases, attrs): for attr, obj in attrs.items(): if attr == "provider_help": # register the help options based on the function arguments - for name, param in inspect.signature(obj).parameters.items(): - if name not in ("self", "kwargs"): - # {name: (cls, is_flag)} - PROVIDER_HELP[name] = ( + for _name, param in inspect.signature(obj).parameters.items(): + if _name not in ("self", "kwargs"): + # {_name: (cls, is_flag)} + PROVIDER_HELP[_name] = ( new_cls, isinstance(param.default, bool), ) - logger.debug(f"Registered help option {name} for provider {name}") + logger.debug(f"Registered help option {_name} for provider {_name}") elif hasattr(obj, "_as_action"): for action in obj._as_action: PROVIDER_ACTIONS[action] = (new_cls, attr) logger.debug(f"Registered action {action} for provider {name}") + # register provider settings validators + if validators := attrs.get("_validators"): + logger.debug(f"Adding {len(validators)} validators for {name}") + settings.validators.extend(validators) return new_cls @@ -124,14 +128,19 @@ def _validate_settings(self, instance_name=None): :param instance_name: A string matching an instance name """ section_name = self.__class__.__name__.upper() + logger.info(f"tpapaioa _validate_settings {section_name=}") # if the provider has instances, load the instance settings if self._fresh_settings.get(section_name).get("instances"): fresh_settings = self._fresh_settings.get(section_name).copy() + logger.info(f"tpapaioa _validate_settings {fresh_settings=}") + instance_name = instance_name or getattr(self, "instance", None) + logger.info(f"tpapaioa _validate_settings {instance_name=}") + # iterate through the instances and find the one that matches the instance_name # if no instance matches, use the default instance for candidate in fresh_settings.instances: - logger.debug("Checking %s against %s", instance_name, candidate) + logger.info("Checking %s against %s", instance_name, candidate) if instance_name in candidate: instance = candidate break @@ -139,15 +148,19 @@ def _validate_settings(self, instance_name=None): instance = candidate self.instance, *_ = instance # store the instance name on the provider fresh_settings.update(inst_vals := instance.values()[0]) + logger.info(f"tpapaioa _validate_settings {fresh_settings=}") + settings[section_name] = fresh_settings + + logger.info(f"tpapaioa _validate_settings {settings[section_name]=}") + logger.info(f"tpapaioa _validate_settings {inst_vals=}") if not inst_vals.get("override_envars"): # if a provider instance doesn't want to override envars, load them + logger.info("tpapaioa _validate_settings loaders") settings.execute_loaders(loaders=[dynaconf.loaders.env_loader]) - new_validators = [v for v in self._validators if v not in settings.validators] - logger.debug(f"Adding new validators: {[v.names[0] for v in new_validators]}") - settings.validators.extend(new_validators) # use selective validation to only validate the instance settings try: + logger.info("tpapaioa _validate_settings calling validators") settings.validators.validate(only=section_name) except dynaconf.ValidationError as err: raise exceptions.ConfigurationError(err) from err