Skip to content

Commit

Permalink
More logical handling of curves
Browse files Browse the repository at this point in the history
  • Loading branch information
angusmcb committed Jan 17, 2025
1 parent 845c898 commit ec9a393
Showing 1 changed file with 60 additions and 67 deletions.
127 changes: 60 additions & 67 deletions wntrqgis/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,34 @@ def __init__(self, flow_units: FlowUnit | wntr.epanet.util.FlowUnits, headloss_f
def to_si(
self,
value: float | ArrayLike | dict,
field: ModelField | ResultField | wntr.epanet.HydParam,
field: ModelField | ResultField | wntr.epanet.HydParam | wntr.epanet.QualParam,
layer: ModelLayer | ResultLayer | None = None,
):
if isinstance(field, wntr.epanet.HydParam):
return wntr.epanet.util.to_si(self._flow_units, value, param=field, darcy_weisbach=self._darcy_weisbach)
if field.python_type not in [int, float] or field.python_type is bool:
return value
try:
conversion_param = self._get_wntr_conversion_param(field, layer)
except ValueError:
return value
if isinstance(field, (wntr.epanet.HydParam, wntr.epanet.QualParam)):
conversion_param = field
else:
try:
conversion_param = self._get_wntr_conversion_param(field, layer)
except ValueError:
return value

return wntr.epanet.util.to_si(
self._flow_units, value, param=conversion_param, darcy_weisbach=self._darcy_weisbach
)

def from_si(
self,
value: float | ArrayLike | dict,
field: ModelField | ResultField | wntr.epanet.HydParam,
field: ModelField | ResultField | wntr.epanet.HydParam | wntr.epanet.QualParam,
layer: ModelLayer | ResultLayer | None = None,
):
if isinstance(field, wntr.epanet.HydParam):
return wntr.epanet.util.from_si(self._flow_units, value, param=field, darcy_weisbach=self._darcy_weisbach)
if field.python_type not in [int, float] or field.python_type is bool:
return value
try:
conversion_param = self._get_wntr_conversion_param(field, layer)
except ValueError:
return value
if isinstance(field, (wntr.epanet.HydParam, wntr.epanet.QualParam)):
conversion_param = field
else:
try:
conversion_param = self._get_wntr_conversion_param(field, layer)
except ValueError:
return value

return wntr.epanet.util.from_si(
self._flow_units, value, param=conversion_param, darcy_weisbach=self._darcy_weisbach
Expand Down Expand Up @@ -191,50 +190,6 @@ def _get_wntr_conversion_param(
msg = f"no param found for {field}"
raise ValueError(msg)

def curve_points_to_si(
self, points: list[tuple[float, float]], curve_type: _CurveType
) -> list[tuple[float, float]]:
return self._convert_curve_points(points, curve_type, wntr.epanet.util.to_si)

def curve_points_from_si(
self, points: list[tuple[float, float]], curve_type: _CurveType
) -> list[tuple[float, float]]:
return self._convert_curve_points(points, curve_type, wntr.epanet.util.from_si)

def _convert_curve_points(self, points, curve_type: _CurveType, conversion_function) -> list[tuple[float, float]]:
flow_units = self._flow_units
converted_points: list[tuple[float, float]] = []
QualParam = wntr.epanet.QualParam # noqa
HydParam = wntr.epanet.HydParam # noqa

if curve_type is _CurveType.VOLUME:
for point in points:
x = conversion_function(flow_units, point[0], HydParam.Length)
y = conversion_function(flow_units, point[1], HydParam.Volume)
converted_points.append((x, y))
elif curve_type is _CurveType.HEAD:
for point in points:
x = conversion_function(flow_units, point[0], HydParam.Flow)
y = conversion_function(flow_units, point[1], HydParam.HydraulicHead)
converted_points.append((x, y))
elif curve_type is _CurveType.EFFICIENCY:
for point in points:
x = conversion_function(flow_units, point[0], HydParam.Flow)
y = point[1]
converted_points.append((x, y))
elif curve_type is _CurveType.HEADLOSS:
for point in points:
x = conversion_function(flow_units, point[0], HydParam.Flow)
y = conversion_function(flow_units, point[1], HydParam.HeadLoss)
converted_points.append((x, y))
else:
raise KeyError
# for point in points:
# x = point[0]
# y = point[1]
# converted_points.append((x, y))
return converted_points


@needs_wntr_pandas
def to_qgis(
Expand Down Expand Up @@ -548,7 +503,7 @@ def _convert_result_df(self, df: pd.DataFrame, field: ResultField) -> pd.DataFra

return converted_df

def _get_qgs_field_type(self, python_type):
def _get_qgs_field_type(self, python_type: type[str | float | bool | int | list]) -> QMetaType | QVariant:
if issubclass(python_type, str):
return QMetaType.QString if USE_QMETATYPE else QVariant.String
if issubclass(python_type, float):
Expand Down Expand Up @@ -660,13 +615,13 @@ def __init__(self, wn: wntr.network.WaterNetworkModel, unit_conversion: _UnitCon
self._next_curve_name = 1
self._unit_conversion = unit_conversion

def add_curve_to_wn(self, curve_string, curve_type: _CurveType):
def add_curve_to_wn(self, curve_string: str, curve_type: _CurveType) -> str | None:
if not curve_string:
return None

name = str(self._next_curve_name)
curve_points = ast.literal_eval(curve_string)
curve_points = self._unit_conversion.curve_points_to_si(curve_points, curve_type)
curve_points: list = ast.literal_eval(curve_string)
curve_points = self._convert_curve_points(curve_points, curve_type, self._unit_conversion.to_si)
self._wn.add_curve(name=name, curve_type=curve_type.value, xy_tuples_list=curve_points)
self._next_curve_name += 1
return name
Expand All @@ -676,7 +631,45 @@ def get_curve_string_from_name(self, curve_name: Any) -> str | None:
return None

curve: wntr.network.elements.Curve = self._wn.get_curve(curve_name)
return repr(self._unit_conversion.curve_points_from_si(curve.points, _CurveType(curve.curve_type)))

converted_points = self._convert_curve_points(
curve.points, _CurveType(curve.curve_type), self._unit_conversion.from_si
)
return repr(converted_points)

def _convert_curve_points(
self, points: list, curve_type: _CurveType, conversion_function
) -> list[tuple[float, float]]:
converted_points: list[tuple[float, float]] = []
HydParam = wntr.epanet.HydParam # noqa: N806

if curve_type is _CurveType.VOLUME:
for point in points:
x = conversion_function(point[0], HydParam.Length)
y = conversion_function(point[1], HydParam.Volume)
converted_points.append((x, y))
elif curve_type is _CurveType.HEAD:
for point in points:
x = conversion_function(point[0], HydParam.Flow)
y = conversion_function(point[1], HydParam.HydraulicHead)
converted_points.append((x, y))
elif curve_type is _CurveType.EFFICIENCY:
for point in points:
x = conversion_function(point[0], HydParam.Flow)
y = point[1]
converted_points.append((x, y))
elif curve_type is _CurveType.HEADLOSS:
for point in points:
x = conversion_function(point[0], HydParam.Flow)
y = conversion_function(point[1], HydParam.HeadLoss)
converted_points.append((x, y))
else:
raise KeyError
# for point in points:
# x = point[0]
# y = point[1]
# converted_points.append((x, y))
return converted_points


@needs_wntr_pandas
Expand Down

0 comments on commit ec9a393

Please sign in to comment.