diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index a8ef54e..f44096d 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -14,7 +14,7 @@ jobs: - '3.10' steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Cache pip uses: actions/cache@v2 with: @@ -24,12 +24,14 @@ jobs: v1-pip-${{ runner.os }}-${{ matrix.python-version }} v1-pip-${{ runner.os }} v1-pip- - - name: Install Python - uses: actions/setup-python@v2 + - name: Install Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - pip install -e .[test] - - name: Test - run: python -m pytest -vv + - name: Update pip + run: python -m pip install --upgrade pip + - name: Install Hatch + run: pip install hatch + - name: Run tests + run: hatch run test -vv + diff --git a/.gitignore b/.gitignore index fcd8092..619f448 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ __pycache__/ venv/ .vscode/ .hypothesis/ +build/ path/to/ config.toml diff --git a/README.md b/README.md index 923c406..af83578 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,12 @@ Zabbix-auto-config is an utility that aims to automatically configure hosts, hos Note: This is only tested with Zabbix 5.0 LTS. +## Requirements + +* Python >=3.8 +* pip >=21.3 +* Zabbix >=5.0 + # Quick start This is a crash course in how to quickly get this application up and running in a local test environment: @@ -45,10 +51,23 @@ EOF ## Application +### Installation (production) + +For production, installing the project in a virtual environment directly with pip is the recommended way to go: + ```bash -python3 -m venv venv +python -m venv venv . venv/bin/activate pip install -e . +``` + +When installing from source, installing in editable mode is recommended, as it allows for pulling in changes from git without having to reinstall the project. + +### Configuration (mock environment) + +A ZAC environment with mock source collectors, host modifiers, and mapping files can be set up with the following commands: + +```bash cp config.sample.toml config.toml sed -i 's/^dryrun = true$/dryrun = false/g' config.toml mkdir -p path/to/source_collector_dir/ path/to/host_modifier_dir/ path/to/map_dir/ @@ -104,7 +123,9 @@ bob@example.com:Hostgroup-bob-hosts EOF ``` -Run the application: +### Running + +Installing the application adds the `zac` command to your path. You can run the application with: ```bash zac @@ -238,3 +259,51 @@ Zac manages only inventory properties configured as `managed_inventory` in `conf 2. Remove the "location" property from the host in the source 3. "location=x" will remain in Zabbix +## Development + +We use the project management tool [Hatch](https://hatch.pypa.io/latest/) for developing the project. The tool manages virtual environment creation, dependency installation, as well as building and publishing of the project, and more. + +Install Hatch with pipx: + +```bash +pipx install hatch +``` + +Install the application with Hatch and enter the virtual environment: + +```bash +hatch shell +``` + +The path to the current Hatch environment can always be found with: + +```bash +hatch env find +``` + +### Testing + +Inside a Hatch environment, tests can be run in two ways. + +With Hatch: + +```bash +hatch run test +``` + +Or by directly invoking pytest: + +```bash +pytest +``` + +The only difference is that Hatch will automatically check dependencies and install/upgrade them if necessary before running the tests. + +#### Testing without Hatch + +If you just want to run tests without Hatch, you can do so by installing the development dependencies independently: + +```bash +# Set up venv or similar ... +pip install .[test] +``` diff --git a/pyproject.toml b/pyproject.toml index fed528d..c6e67e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,58 @@ [build-system] -requires = ["setuptools"] -build-backend = "setuptools.build_meta" +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "zabbix-auto-config" +dynamic = ["version"] +description = "Zabbix auto config - ZAC" +readme = "README.md" +requires-python = ">=3.8" +license = "MIT" +keywords = [] +authors = [{ name = "Paal Braathen", email = "paal.braathen@usit.uio.no" }] +maintainers = [{ name = "Peder Hovdan Andresen", email = "pederhan@uio.no" }] +classifiers = [ + "Intended Audience :: System Administrators", + "Natural Language :: English", + "Operating System :: POSIX :: Linux", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] +dependencies = [ + "multiprocessing-logging==0.3.1", + "psycopg2>=2.9.5", + "pydantic>=2.0.0", + "pyzabbix>=1.3.0", + "requests>=1.0.0", + "tomli>=2.0.0", +] + +[project.optional-dependencies] +test = ["pytest>=7.4.3", "pytest-timeout>=2.2.0", "hypothesis>=6.62.1"] + +[project.urls] +Source = "https://github.com/unioslo/zabbix-auto-config" + +[project.scripts] +zac = "zabbix_auto_config:main" + +[tool.hatch.version] +path = "zabbix_auto_config/__about__.py" + +[tool.hatch.envs.default] +dependencies = ["zabbix-auto-config[test]"] + +[tool.hatch.envs.default.scripts] +test = "pytest {args}" + +[tool.hatch.build.targets.sdist] +exclude = ["/.github", "/tests", "/path"] + +[tool.hatch.build.targets.wheel] +packages = ["zabbix_auto_config"] diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index c965ef5..0000000 --- a/setup.cfg +++ /dev/null @@ -1,46 +0,0 @@ -[metadata] -name = zabbix-auto-config -version = 0.1.0 -description = Zabbix auto config - ZAC -long_description = file: README.md -long_description_content_type = text/markdown; charset=UTF-8 -url = https://github.com/unioslo/zabbix-auto-config -author = University of Oslo, University Center for Information Technology -author_email = noreply@usit.uio.no -license = MIT -license_file = LICENSE -classifiers = - # Trove classifiers - # Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers - License :: OSI Approved :: MIT License - Programming Language :: Python - Programming Language :: Python :: 3 - Programming Language :: Python :: 3.8 - Programming Language :: Python :: 3.9 - Programming Language :: Python :: 3.10 - -[options] -python_requires = >=3.8 -install_requires = - multiprocessing-logging == 0.3.1 - psycopg2 - pydantic >= 2.0.0 - pyzabbix - requests - tomli - packaging -include_package_data = True -packages = find: - -[options.packages.find] -exclude = tests - -[options.entry_points] -console_scripts = - zac = zabbix_auto_config:main - -[options.extras_require] -test = - pytest - hypothesis - pytest-timeout diff --git a/setup.py b/setup.py deleted file mode 100644 index 5b915d0..0000000 --- a/setup.py +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env python -import setuptools -if __name__ == "__main__": - setuptools.setup() diff --git a/tests/test_processing/test_sourcecollectorprocess.py b/tests/test_processing/test_sourcecollectorprocess.py index f507274..390dc2a 100644 --- a/tests/test_processing/test_sourcecollectorprocess.py +++ b/tests/test_processing/test_sourcecollectorprocess.py @@ -6,6 +6,7 @@ from zabbix_auto_config.processing import SourceCollectorProcess from zabbix_auto_config.models import Host, SourceCollectorSettings +from zabbix_auto_config.state import get_manager class SourceCollector: @@ -22,7 +23,7 @@ def collect(*args, **kwargs) -> List[Host]: def test_source_collector_process(): process = SourceCollectorProcess( name="test-source", - state=multiprocessing.Manager().dict(), + state=get_manager().State(), module=SourceCollector, config=SourceCollectorSettings( module_name="source_collector", @@ -40,7 +41,7 @@ def test_source_collector_process(): hosts = process.source_hosts_queue.get() assert len(hosts["hosts"]) == 2 assert hosts["hosts"][0].hostname == "foo.example.com" - assert process.state["ok"] is True + assert process.state.ok is True finally: process.stop_event.set() process.join(timeout=0.01) @@ -57,7 +58,7 @@ def collect(*args, **kwargs) -> List[Host]: def test_source_collector_disable_on_failure(): process = SourceCollectorProcess( name="test-source", - state=multiprocessing.Manager().dict(), + state=get_manager().State(), module=FaultySourceCollector, config=SourceCollectorSettings( module_name="faulty_source_collector", @@ -73,9 +74,9 @@ def test_source_collector_disable_on_failure(): # Start process and wait until it fails try: process.start() - while process.state["ok"] is True: + while process.state.ok is True: time.sleep(0.01) - assert process.state["ok"] is False + assert process.state.ok is False assert process.source_hosts_queue.empty() is True process.stop_event.set() finally: diff --git a/tests/test_processing/test_zabbixupdater.py b/tests/test_processing/test_zabbixupdater.py index 61d0ad4..38f4e71 100644 --- a/tests/test_processing/test_zabbixupdater.py +++ b/tests/test_processing/test_zabbixupdater.py @@ -1,4 +1,3 @@ -import multiprocessing from pathlib import Path import time from unittest.mock import patch @@ -10,6 +9,7 @@ from zabbix_auto_config.models import ZabbixSettings from zabbix_auto_config.processing import ZabbixUpdater +from zabbix_auto_config.state import get_manager def raises_connect_timeout(*args, **kwargs): @@ -32,7 +32,7 @@ def test_zabbixupdater_connect_timeout(mock_psycopg2_connect): ZabbixUpdater( name="connect-timeout", db_uri="", - state=multiprocessing.Manager().dict(), + state=get_manager().State(), zabbix_config=ZabbixSettings( map_dir="", url="", @@ -50,7 +50,6 @@ def do_update(self): raise requests.exceptions.ReadTimeout("read timeout") - @pytest.mark.timeout(5) def test_zabbixupdater_read_timeout(tmp_path: Path, mock_psycopg2_connect): # TODO: use mapping file fixtures from #67 @@ -63,7 +62,7 @@ def test_zabbixupdater_read_timeout(tmp_path: Path, mock_psycopg2_connect): process = TimeoutUpdater( name="read-timeout", db_uri="", - state=multiprocessing.Manager().dict(), + state=get_manager().State(), zabbix_config=ZabbixSettings( map_dir=str(map_dir), url="", @@ -77,9 +76,11 @@ def test_zabbixupdater_read_timeout(tmp_path: Path, mock_psycopg2_connect): # Start the process and wait for it to be marked as unhealthy try: process.start() - while process.state["ok"] is True: + while process.state.ok is True: time.sleep(0.1) - assert process.state["ok"] is False + assert process.state.ok is False + assert process.state.error_type == "ReadTimeout" + assert process.state.error_count == 1 process.stop_event.set() finally: process.join(timeout=0.01) diff --git a/tests/test_state.py b/tests/test_state.py new file mode 100644 index 0000000..1eb1f0b --- /dev/null +++ b/tests/test_state.py @@ -0,0 +1,178 @@ +import datetime +import time + +import pytest +from zabbix_auto_config.exceptions import ZACException +from zabbix_auto_config.processing import BaseProcess + +from zabbix_auto_config.state import State, get_manager, StateProxy + + +def test_manager_state(): + manager = get_manager() + state = manager.State() + assert isinstance(state, StateProxy) + # Test defaults + assert state.ok is True + assert state.error is None + assert state.error_type is None + assert state.error_count == 0 + assert state.error_time is None + + +@pytest.mark.parametrize("use_manager", [True, False]) +@pytest.mark.parametrize("with_error", [True, False]) +def test_state_set_ok(use_manager: bool, with_error: bool): + if use_manager: + state = get_manager().State() + else: + state = State() + + # Give state object some error state values + if with_error: + state.error = "Error" + state.error_type = "ErrorType" + state.error_count = 1 + state.error_time = datetime.datetime(2021, 1, 1, 0, 0, 0).timestamp() + + state.set_ok() + assert state.ok is True + assert state.error is None + assert state.error_type is None + assert state.error_time is None + if with_error: + assert state.error_count == 1 + else: + assert state.error_count == 0 + + +# Use a subclass of Exception so that we can test that +# the error type is set correctly +# Also needs to be in the global scope to be pickleable +class TimeoutError(Exception): + pass + + +@pytest.mark.parametrize("use_manager", [True, False]) +def test_state_set_error(use_manager: bool): + if use_manager: + state = get_manager().State() + else: + state = State() + + # Sanity test of defaults + assert state.ok is True + assert state.error is None + assert state.error_type is None + assert state.error_time is None + assert state.error_count == 0 + + time.sleep(0.01) # to ensure later timestamps are greater + e = TimeoutError("Test error") + state.set_error(e) + assert state.ok is False + assert state.error == "Test error" + assert state.error_type == "TimeoutError" + assert state.error_time < time.time() + assert state.error_count == 1 + + # Set the error again to check count and time are updated + prev_time = float(state.error_time) + state.set_error(e) + assert state.error_count == 2 + assert state.error_time > prev_time + + +class ZACExceptionProcess(BaseProcess): + def work(self) -> None: + raise ZACException("Test error") + + +@pytest.mark.timeout(10) +def test_state_in_other_process() -> None: + state = get_manager().State() + process = ZACExceptionProcess( + name="test", + state=state, + ) + + process.start() + try: + while process.state.ok: + time.sleep(0.01) + process.stop_event.set() + finally: + # stop process to prevent errors from accumulating + process.join(timeout=0.01) + + assert process.state.ok is False + assert process.state.error_type == "ZACException" + assert process.state.error_count == 1 + assert process.state is state + + # Test that multiple state proxies do not refer to the same + # underlying State object + state2 = get_manager().State() + assert state2.ok is True + assert state2 is not state + # This process will not fail and thus will set its state to OK + process2 = BaseProcess( + name="test", + state=state2, + ) + + # Start and stop process, then check state + try: + process2.start() + process2.stop_event.set() + finally: + process2.join(timeout=1) + assert process2.state.ok is True + assert process2.state.asdict() == state2.asdict() + assert process2.state.asdict() != process.state.asdict() + assert process2.state is not process.state + + +@pytest.mark.parametrize("use_manager", [True, False]) +def test_state_asdict_ok(use_manager: bool) -> None: + if use_manager: + state = get_manager().State() + else: + state = State() + state.set_ok() + assert state.asdict() == { + "ok": True, + "error": None, + "error_type": None, + "error_count": 0, + "error_time": None, + } + + +class CustomException(Exception): + pass + + +@pytest.mark.parametrize("use_manager", [True, False]) +def test_state_asdict_error(use_manager: bool) -> None: + if use_manager: + state = get_manager().State() + else: + state = State() + + # Mocking datetime in subprocesses is a bit of a chore, so we just + # check that the error_time is a timestamp value within a given range + pre = datetime.datetime.now().timestamp() + state.set_error(CustomException("Test error")) + post = datetime.datetime.now().timestamp() + d = state.asdict() + + assert post >= d["error_time"] >= pre + d.pop("error_time") + + assert d == { + "ok": False, + "error": "Test error", + "error_type": "CustomException", + "error_count": 1, + } diff --git a/zabbix_auto_config/__about__.py b/zabbix_auto_config/__about__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/zabbix_auto_config/__about__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/zabbix_auto_config/__init__.py b/zabbix_auto_config/__init__.py index 283b4ed..178365c 100644 --- a/zabbix_auto_config/__init__.py +++ b/zabbix_auto_config/__init__.py @@ -13,13 +13,13 @@ import multiprocessing_logging import tomli +from zabbix_auto_config.state import get_manager + from . import exceptions from . import models from . import processing from ._types import SourceCollectorDict, SourceCollectorModule - - -__version__ = importlib.metadata.version(os.path.basename(os.path.dirname(__file__))) +from .__about__ import __version__ def get_source_collectors(config: models.Settings) -> List[SourceCollectorDict]: @@ -67,7 +67,9 @@ def get_config(): return config -def write_health(health_file, processes, queues, failsafe): +def write_health( + health_file, processes: List[processing.BaseProcess], queues, failsafe +): now = datetime.datetime.now() health = { "date": now.isoformat(timespec="seconds"), @@ -81,14 +83,16 @@ def write_health(health_file, processes, queues, failsafe): } for process in processes: - health["processes"].append({ - "name": process.name, - "pid": process.pid, - "alive": process.is_alive(), - "ok": process.state.get("ok") - }) + health["processes"].append( + { + "name": process.name, + "pid": process.pid, + "alive": process.is_alive(), + **process.state.asdict(), + } + ) - health["all_ok"] = all([p["ok"] for p in health["processes"]]) + health["all_ok"] = all(p.state.ok for p in processes) for queue in queues: health["queues"].append({ @@ -98,8 +102,8 @@ def write_health(health_file, processes, queues, failsafe): try: with open(health_file, "w") as f: f.write(json.dumps(health)) - except: - logging.error("Unable to write health file: %s", health_file) + except Exception as e: + logging.error("Unable to write health file %s: %s", health_file, e) def log_process_status(processes): @@ -110,7 +114,7 @@ def log_process_status(processes): process_status = "alive" if process.is_alive() else "dead" process_statuses.append(f"{process_name} is {process_status}") - logging.debug("Process status: %s", ', '.join(process_statuses)) + logging.info("Process status: %s", ", ".join(process_statuses)) def main(): @@ -122,31 +126,62 @@ def main(): logging.info("Main start (%d) version %s", os.getpid(), __version__) stop_event = multiprocessing.Event() - state_manager = multiprocessing.Manager() - processes = [] + state_manager = get_manager() + processes = [] # type: List[processing.BaseProcess] source_hosts_queues = [] source_collectors = get_source_collectors(config) for source_collector in source_collectors: source_hosts_queue = multiprocessing.Queue(maxsize=1) - process = processing.SourceCollectorProcess(source_collector["name"], state_manager.dict(), source_collector["module"], source_collector["config"], source_hosts_queue) + process = processing.SourceCollectorProcess( + source_collector["name"], + state_manager.State(), + source_collector["module"], + source_collector["config"], + source_hosts_queue, + ) source_hosts_queues.append(source_hosts_queue) processes.append(process) try: - process = processing.SourceHandlerProcess("source-handler", state_manager.dict(), config.zac.db_uri, source_hosts_queues) + process = processing.SourceHandlerProcess( + "source-handler", + state_manager.State(), + config.zac.db_uri, + source_hosts_queues, + ) processes.append(process) - process = processing.SourceMergerProcess("source-merger", state_manager.dict(), config.zac.db_uri, config.zac.host_modifier_dir) + process = processing.SourceMergerProcess( + "source-merger", + state_manager.State(), + config.zac.db_uri, + config.zac.host_modifier_dir, + ) processes.append(process) - process = processing.ZabbixHostUpdater("zabbix-host-updater", state_manager.dict(), config.zac.db_uri, config.zabbix) + process = processing.ZabbixHostUpdater( + "zabbix-host-updater", + state_manager.State(), + config.zac.db_uri, + config.zabbix, + ) processes.append(process) - process = processing.ZabbixHostgroupUpdater("zabbix-hostgroup-updater", state_manager.dict(), config.zac.db_uri, config.zabbix) + process = processing.ZabbixHostgroupUpdater( + "zabbix-hostgroup-updater", + state_manager.State(), + config.zac.db_uri, + config.zabbix, + ) processes.append(process) - process = processing.ZabbixTemplateUpdater("zabbix-template-updater", state_manager.dict(), config.zac.db_uri, config.zabbix) + process = processing.ZabbixTemplateUpdater( + "zabbix-template-updater", + state_manager.State(), + config.zac.db_uri, + config.zabbix, + ) processes.append(process) except exceptions.ZACException as e: logging.error("Failed to initialize child processes. Exiting: %s", str(e)) diff --git a/zabbix_auto_config/processing.py b/zabbix_auto_config/processing.py index 9271534..c86aeeb 100644 --- a/zabbix_auto_config/processing.py +++ b/zabbix_auto_config/processing.py @@ -26,6 +26,7 @@ from . import models from . import utils from .errcount import RollingErrorCounter +from .state import State from ._types import HostModifierDict, SourceCollectorModule, HostModifierModule if TYPE_CHECKING: @@ -33,7 +34,7 @@ from psycopg2.extensions import cursor as Cursor class BaseProcess(multiprocessing.Process): - def __init__(self, name, state): + def __init__(self, name: str, state: State): super().__init__() self.name = name self.state = state @@ -41,7 +42,7 @@ def __init__(self, name, state): self.update_interval = 1 self.next_update = datetime.datetime.now() - self.state["ok"] = True + self.state.set_ok() self.stop_event = multiprocessing.Event() def run(self): @@ -63,13 +64,16 @@ def run(self): try: self.work() - self.state["ok"] = True - except exceptions.ZACException as e: - logging.error("Work exception: %s", str(e)) - self.state["ok"] = False - except requests.exceptions.Timeout as e: - logging.error("Timeout exception: %s", str(e)) - self.state["ok"] = False + self.state.set_ok() + except Exception as e: + # These are the error types we handle ourselves then continue + if isinstance(e, requests.exceptions.Timeout): + logging.error("Timeout exception: %s", str(e)) + elif isinstance(e, exceptions.ZACException): + logging.error("Work exception: %s", str(e)) + else: + raise e # all other exceptions are fatal + self.state.set_error(e) if self.update_interval > 1 and self.next_update < datetime.datetime.now(): # Only log warning when update_interval is actually changed from default @@ -152,10 +156,16 @@ def work(self): self.error_counter.add(exception=e) if self.error_counter.tolerance_exceeded(): if self.config.exit_on_error: - logging.critical("Error tolerance exceeded. Terminating application.") + logging.critical( + "Error tolerance exceeded. Terminating application." + ) self.stop_event.set() + # TODO: raise exception with message above or just an empty exception? else: self.disable() + raise exceptions.ZACException( + f"Failed to collect from source {self.name!r}: {e}" + ) from e def disable(self) -> None: if self.disabled: @@ -178,14 +188,15 @@ def disable(self) -> None: # Reset the error counter so that previous errors don't count towards # the error counter in the next run in case the disable duration is short self.error_counter.reset() + # TODO: raise specific exception here instead of ZACException def collect(self) -> None: start_time = time.time() try: hosts = self.module.collect(**self.collector_config) assert isinstance(hosts, list), "Collect module did not return a list" - except (AssertionError, Exception) as e: - raise exceptions.SourceCollectorError(f"Unable to collect from module ({self.config.module_name}): {str(e)}") + except Exception as e: + raise exceptions.SourceCollectorError(e) from e valid_hosts = [] # type: List[models.Host] for host in hosts: @@ -616,6 +627,13 @@ def disable_host(self, zabbix_host): logging.info("Disabling host: '%s' (%s)", zabbix_host["host"], zabbix_host["hostid"]) except pyzabbix.ZabbixAPIException as e: logging.error("Error when disabling host '%s' (%s): %s", zabbix_host["host"], zabbix_host["hostid"], e.args) + except IndexError: + logging.critical( + "Disabled host group '%s' does not exist in Zabbix. Cannot disable host '%s'", + self.config.hostgroup_disabled, + zabbix_host.get("host"), + ) + self.stop_event.set() else: logging.info("DRYRUN: Disabling host: '%s' (%s)", zabbix_host["host"], zabbix_host["hostid"]) @@ -644,6 +662,13 @@ def enable_host(self, db_host): logging.info("Enabling new host: '%s' (%s)", hostname, result["hostids"][0]) except pyzabbix.ZabbixAPIException as e: logging.error("Error when enabling/creating host '%s': %s", hostname, e.args) + except IndexError: + logging.critical( + "Enabled host group '%s' does not exist in Zabbix. Cannot enable host '%s'", + self.config.hostgroup_all, + hostname, + ) + self.stop_event.set() else: logging.info("DRYRUN: Enabling host: '%s'", hostname) diff --git a/zabbix_auto_config/state.py b/zabbix_auto_config/state.py new file mode 100644 index 0000000..36e357a --- /dev/null +++ b/zabbix_auto_config/state.py @@ -0,0 +1,74 @@ +import time +import types +from dataclasses import asdict +from multiprocessing.managers import BaseManager, NamespaceProxy +from typing import Any, Dict, Optional + +from pydantic.dataclasses import dataclass + +@dataclass +class State: + ok: bool = True + """True if process has not encountered an error in its most recent run.""" + error: Optional[str] = None + """The error message if `ok` is False.""" + error_type: Optional[str] = None + """The type of error if `ok` is False.""" + error_count: int = 0 + """The number of errors the process has encountered.""" + error_time: Optional[float] = None + """The timestamp of the most recent error.""" + + def asdict(self) -> Dict[str, Any]: + """Return dict representation of the State object.""" + return asdict(self) + + def set_ok(self) -> None: + """Set the current state to OK, clear error information. + + NOTE + ---- + This does not reset the error count. + """ + self.ok = True + self.error = None + self.error_type = None + self.error_time = None + + def set_error(self, exc: Exception) -> None: + self.ok = False + self.error = str(exc) + self.error_type = type(exc).__name__ + self.error_time = time.time() + self.error_count += 1 + + +class Manager(BaseManager): + pass + + +class StateProxy(NamespaceProxy): + # https://stackoverflow.com/a/63741184 (added return in wrapper function) + # As a one-off, we use a static Proxy type, but if we need to do this + # to other types as well, it might be worth making a Proxy factory function + """A proxy class that gives access to all attributes of a State object.""" + _exposed_ = tuple(dir(State)) + + def __getattr__(self, name): + result = super().__getattr__(name) + if isinstance(result, types.MethodType): + + def wrapper(*args, **kwargs): + return self._callmethod(name, args, kwargs) + + return wrapper + return result + + +Manager.register("State", State, proxytype=StateProxy) + + +def get_manager() -> Manager: + m = Manager() + m.start() + return m