Skip to content

Commit

Permalink
Debug PRT
Browse files Browse the repository at this point in the history
  • Loading branch information
tpapaioa committed Sep 19, 2024
1 parent 6b88e8e commit 46e1f5f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
17 changes: 9 additions & 8 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
This module (or parent directory) should be used as the main entry point for the Broker API.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from contextlib import contextmanager

from logzero import logger
Expand Down Expand Up @@ -60,22 +60,23 @@ def __get__(self, instance, owner):
if not instance:
return self.func

def mp_split(*args, **kwargs):
def execute(*args, **kwargs):
count = instance._kwargs.get("_count", None)
if count is None:
return self.func(instance, *args, **kwargs)

results = []
max_workers_count = self.MAX_WORKERS or count
with self.EXECUTOR(max_workers=max_workers_count) as workers:
completed_futures = as_completed(
workers.submit(self.func, instance, *args, **kwargs) for _ in range(count)
)
for f in completed_futures:
with self.EXECUTOR(max_workers=max_workers_count) as executor:
futures = [
executor.submit(self.func, instance, *args, **kwargs) for _ in range(count)
]
wait(futures)
for f in futures:
results.extend(f.result())
return results

return mp_split
return execute


class Broker:
Expand Down
31 changes: 22 additions & 9 deletions broker/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ def __new__(cls, name, bases, attrs):
for attr, obj in attrs.items():
if attr == "provider_help":
# register the help options based on the function arguments
for name, param in inspect.signature(obj).parameters.items():
if name not in ("self", "kwargs"):
# {name: (cls, is_flag)}
PROVIDER_HELP[name] = (
for _name, param in inspect.signature(obj).parameters.items():
if _name not in ("self", "kwargs"):
# {_name: (cls, is_flag)}
PROVIDER_HELP[_name] = (
new_cls,
isinstance(param.default, bool),
)
logger.debug(f"Registered help option {name} for provider {name}")
logger.debug(f"Registered help option {_name} for provider {_name}")
elif hasattr(obj, "_as_action"):
for action in obj._as_action:
PROVIDER_ACTIONS[action] = (new_cls, attr)
logger.debug(f"Registered action {action} for provider {name}")
# register provider settings validators
if validators := attrs.get("_validators"):
logger.debug(f"Adding {len(validators)} validators for {name}")
settings.validators.extend(validators)
return new_cls


Expand Down Expand Up @@ -124,30 +128,39 @@ def _validate_settings(self, instance_name=None):
:param instance_name: A string matching an instance name
"""
section_name = self.__class__.__name__.upper()
logger.info(f"tpapaioa _validate_settings {section_name=}")
# if the provider has instances, load the instance settings
if self._fresh_settings.get(section_name).get("instances"):
fresh_settings = self._fresh_settings.get(section_name).copy()
logger.info(f"tpapaioa _validate_settings {fresh_settings=}")

instance_name = instance_name or getattr(self, "instance", None)
logger.info(f"tpapaioa _validate_settings {instance_name=}")

# iterate through the instances and find the one that matches the instance_name
# if no instance matches, use the default instance
for candidate in fresh_settings.instances:
logger.debug("Checking %s against %s", instance_name, candidate)
logger.info("Checking %s against %s", instance_name, candidate)
if instance_name in candidate:
instance = candidate
break
elif candidate.values()[0].get("default") or len(fresh_settings.instances) == 1:
instance = candidate
self.instance, *_ = instance # store the instance name on the provider
fresh_settings.update(inst_vals := instance.values()[0])
logger.info(f"tpapaioa _validate_settings {fresh_settings=}")

settings[section_name] = fresh_settings

logger.info(f"tpapaioa _validate_settings {settings[section_name]=}")
logger.info(f"tpapaioa _validate_settings {inst_vals=}")
if not inst_vals.get("override_envars"):
# if a provider instance doesn't want to override envars, load them
logger.info("tpapaioa _validate_settings loaders")
settings.execute_loaders(loaders=[dynaconf.loaders.env_loader])
new_validators = [v for v in self._validators if v not in settings.validators]
logger.debug(f"Adding new validators: {[v.names[0] for v in new_validators]}")
settings.validators.extend(new_validators)
# use selective validation to only validate the instance settings
try:
logger.info("tpapaioa _validate_settings calling validators")
settings.validators.validate(only=section_name)
except dynaconf.ValidationError as err:
raise exceptions.ConfigurationError(err) from err
Expand Down

0 comments on commit 46e1f5f

Please sign in to comment.