Skip to content

Commit

Permalink
refactor: fha calculations (#1414)
Browse files Browse the repository at this point in the history
* refactor: fha calculation functions

* test: update and add tests for refactored functions

* refactor: fix linting errors

* refactor: fix more linting errors

* refactor: incorporate Sourcery AI suggestions
  • Loading branch information
weibullguy authored Oct 12, 2024
1 parent 090c0da commit f503600
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 155 deletions.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ dependencies = [

[tool.hatch.envs.test.scripts]
install = "make PREFIX=$VIRTUAL_ENV install"
devinstall = "pip install -U pip && pip install -e ."
dotest = "pytest {args}"
clean-test = "rm -f .coverage* cobertura.xml && rm -fr .pytest_cache"
run-unit = "pytest -m unit --no-cov --cache-clear tests"
run-integration = "pytest -m integration --no-cov --cache-clear tests"
Expand Down Expand Up @@ -176,7 +178,7 @@ format = [
"run-docformatter",
"run-isort",
]
lint = "ruff check {args}"
lint = "pylint -j0 --rcfile=./pyproject.toml {args}"
maintain = [
"run-mccabe",
"run-radon-hal",
Expand Down
205 changes: 96 additions & 109 deletions src/ramstk/analyses/fha.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# ramstk.analyses.fha.py is part of the RAMSTK Project
#
# All rights reserved.
# Copyright 2019 Doyle Rowland doyle.rowland <AT> reliaqual <DOT> com
# Copyright since 2007 Doyle Rowland doyle.rowland <AT> reliaqual <DOT> com
"""Functional Hazards Analysis (FHA) Module."""

# Standard Library Imports
import re
from typing import Any, Dict, List

# Third Party Imports
# noinspection PyPackageRequirements
from sympy import symbols, sympify # type: ignore
from sympy import SympifyError, symbols, sympify

# RAMSTK Package Imports
from ramstk.exceptions import OutOfRangeError
Expand All @@ -31,6 +31,26 @@
"High": 5,
"Major": 6,
}
VALID_VARIABLES = {
"hr",
"pi1",
"pi2",
"pi3",
"pi4",
"pi5",
"uf1",
"uf2",
"uf3",
"ui1",
"ui2",
"ui3",
"res1",
"res2",
"res3",
"res4",
"res5",
"0",
}


def calculate_hri(probability: str, severity: str) -> int:
Expand All @@ -50,14 +70,15 @@ def calculate_hri(probability: str, severity: str) -> int:
except KeyError as _error:
raise OutOfRangeError(
(
f"calculate_hri() was passed an unknown hazard "
f"probability ({probability}) or severity ({severity}) "
f"description."
f"Invalid hazard input: probability ({probability}) or severity "
f"({severity}) not recognized. Expected ranges: probability 1-5, "
f"severity I-IV."
)
) from _error


def calculate_user_defined(fha: Dict[str, Any]) -> Dict[str, Any]:
# pylint: disable=too-many-locals
"""Calculate the user-defined hazards analysis.
:param fha: the user-defined functional hazards assessment dict. The
Expand All @@ -79,82 +100,36 @@ def calculate_user_defined(fha: Dict[str, Any]) -> Dict[str, Any]:
"uf1 uf2 uf3 ui1 ui2 ui3 res1 res2 res3 res4 res5"
)

# pylint: disable=eval-used
fha["res1"] = sympify(fha["equation1"]).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
fha["res2"] = sympify(fha["equation2"]).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
fha["res3"] = sympify(fha["equation3"]).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
fha["res4"] = sympify(fha["equation4"]).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
fha["res5"] = sympify(fha["equation5"]).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
for _idx in range(1, 6):
_equation_key = f"equation{_idx}"
_equation = fha.get(_equation_key, "0.0")

# If the equation is empty, replace it with "0.0".
if not _equation.strip():
fha[_equation_key] = "0.0"
else:
# Validate the equation if it's not empty.
_do_validate_equation(_equation)

# Safely evaluate the equation using sympify
try:
fha[f"res{_idx}"] = sympify(_equation).evalf(
subs={
uf1: fha["uf1"],
uf2: fha["uf2"],
uf3: fha["uf3"],
ui1: fha["ui1"],
ui2: fha["ui2"],
ui3: fha["ui3"],
res1: fha["res1"],
res2: fha["res2"],
res3: fha["res3"],
res4: fha["res4"],
res5: fha["res5"],
}
)
except SympifyError as exc:
raise ValueError(f"Invalid syntax in equation{_idx}: {_equation}") from exc

return fha

Expand All @@ -167,13 +142,8 @@ def set_user_defined_floats(fha: Dict[str, Any], floats: List[float]) -> Dict[st
:return: fha; the functional hazard assessment dict with updated float values.
:rtype: dict
"""
_key = ""
for _idx in [0, 1, 2]:
try:
_key = list(fha.keys())[_idx]
fha[_key] = float(floats[_idx])
except IndexError:
fha[_key] = 0.0
for _idx in range(3):
fha[f"uf{_idx + 1}"] = float(floats[_idx]) if _idx < len(floats) else 0.0

return fha

Expand All @@ -186,13 +156,8 @@ def set_user_defined_ints(fha: Dict[str, Any], ints: List[int]) -> Dict[str, Any
:return: fha; the functional hazard assessment dict with updated integer values.
:rtype: dict
"""
_key = ""
for _idx in [3, 4, 5]:
try:
_key = list(fha.keys())[_idx]
fha[_key] = int(ints[_idx - 3])
except IndexError:
fha[_key] = 0
for _idx in range(3):
fha[f"ui{_idx + 1}"] = int(ints[_idx]) if _idx < len(ints) else 0

return fha

Expand All @@ -210,14 +175,22 @@ def set_user_defined_functions(
:return: fha; the functional hazard assessment dict with updated functions.
:rtype: dict
"""
_key = ""
for _idx in [6, 7, 8, 9, 10]:
for _idx in range(5):
try:
_key = list(fha.keys())[_idx]
fha[_key] = (
"0.0" if not str(functions[_idx - 6]) else str(functions[_idx - 6])
)
_key = list(fha.keys())[_idx + 6]
_equation = str(functions[_idx]).strip()

# If the function is an empty string, replace it with "0.0".
if not _equation:
fha[_key] = "0.0"
else:
# Validate non-empty equations
_do_validate_equation(_equation)
fha[_key] = _equation

except IndexError:
# If functions list doesn't contain enough elements, set the remaining
# to "0.0".
fha[_key] = "0.0"

return fha
Expand All @@ -236,12 +209,26 @@ def set_user_defined_results(
:return: fha; the functional hazard assessment dict with updated results.
:rtype: dict
"""
_key = ""
for _idx in [11, 12, 13, 14, 15]:
try:
_key = list(fha.keys())[_idx]
fha[_key] = results[_idx - 11]
except IndexError:
fha[_key] = 0
for _idx in range(5):
fha[f"res{_idx + 1}"] = float(results[_idx]) if _idx < len(results) else 0.0

return fha


def _do_validate_equation(equation: str) -> None:
"""Validate that the equation contains only valid variables.
:param equation: The equation to validate.
:type equation: str
:raises ValueError: If the equation contains invalid variables.
"""
# Find all the variable names in the equation (alphanumeric strings).
_variables = set(re.findall(r"\b\w+\b", equation))

# Check if there are any variables not in the allowed set.
_invalid_vars = _variables - VALID_VARIABLES

if _invalid_vars:
raise ValueError(
f"Invalid variables found in equation: {', '.join(_invalid_vars)}"
)
Loading

0 comments on commit f503600

Please sign in to comment.