diff --git a/yaramo/edge.py b/yaramo/edge.py index b2ef316..79f7a28 100644 --- a/yaramo/edge.py +++ b/yaramo/edge.py @@ -66,6 +66,15 @@ def __get_length(self) -> float: self.node_b.geo_node ) return total_length + + def get_direction_based_on_start_node(self, start: "Node") -> "SignalDirection": + from yaramo.signal import SignalDirection + + if self.node_a.uuid == start.uuid: + return SignalDirection.IN + elif self.node_b.uuid == start.uuid: + return SignalDirection.GEGEN + return None def get_direction_based_on_nodes(self, node_a: "Node", node_b: "Node") -> "SignalDirection": """Returns the direction according to whether the order of node_a and node_b is the same as in self @@ -122,6 +131,45 @@ def get_signals_with_direction_in_order(self, direction: "SignalDirection") -> L result.sort(key=lambda x: x.distance_edge, reverse=(direction == SignalDirection.GEGEN)) return result + def get_opposite_node(self, node: "Node") -> "Node": + """Returns the opposite Node of the given Node + + Parameters + ---------- + node : Node + The Node to get the opposite Node of + + Returns + ------- + Node + The opposite Node + """ + + if self.node_a.uuid == node.uuid: + return self.node_b + return self.node_a + + def get_next_geo_node(self, node: "Node") -> "GeoNode": + """Returns the next GeoNode on Edgeof the given Top Node + + Parameters + ---------- + geo_node : Node + The Top Node to get the next GeoNode of + + Returns + ------- + GeoNode + The next GeoNode + """ + if len(self.intermediate_geo_nodes) < 2: + return self.get_opposite_node(node).geo_node + if self.node_a.uuid == node.uuid: + return self.intermediate_geo_nodes[1] + if self.node_b.uuid == node.uuid: + return self.intermediate_geo_nodes[-2] + return None + def to_serializable(self): """See the description in the BaseElement class. diff --git a/yaramo/node.py b/yaramo/node.py index 2c68f1a..37fc29a 100644 --- a/yaramo/node.py +++ b/yaramo/node.py @@ -1,5 +1,6 @@ import math from enum import Enum +from typing import List from yaramo.base_element import BaseElement from yaramo.geo_node import GeoNode @@ -31,12 +32,12 @@ def __init__(self, turnout_side=None, **kwargs): """ super().__init__(**kwargs) - self.connected_on_head = None - self.connected_on_left = None - self.connected_on_right = None + self.connected_edge_on_head = None + self.connected_edge_on_right = None + self.connected_edge_on_left = None self.maximum_speed_on_left = None self.maximum_speed_on_right = None - self.connected_nodes: list["Node"] = [] + self.connected_edges: list["Edge"] = [] self.geo_node: GeoNode = None self.turnout_side: str = turnout_side @@ -52,38 +53,95 @@ def maximum_speed(self, node_a: "Node", node_b: "Node"): else: return None - def set_connection_head(self, node: "Node"): - self.connected_on_head = node - self.connected_nodes.append(node) + @property + def connected_nodes(self): + return [edge.get_opposite_node(self) for edge in self.connected_edges] - def set_connection_left(self, node: "Node"): - self.connected_on_left = node - self.connected_nodes.append(node) + @property + def connected_on_head(self): + if self.connected_edge_on_head is None: + self.calc_anschluss_of_all_edges() + if self.connected_edge_on_head is None: + return None + return self.connected_edge_on_head.get_opposite_node(self) + + @property + def connected_on_left(self): + if self.connected_edge_on_head is None: + self.calc_anschluss_of_all_edges() + if self.connected_edge_on_left is None: + return None + return self.connected_edge_on_left.get_opposite_node(self) + + @property + def connected_on_right(self): + if self.connected_edge_on_head is None: + self.calc_anschluss_of_all_edges() + if self.connected_edge_on_right is None: + return None + return self.connected_edge_on_right.get_opposite_node(self) + + def set_connection_head_edge(self, edge: "Edge"): + self.connected_edge_on_head = edge + self.connected_edges.append(edge) + + def set_connection_left_edge(self, edge: "Edge"): + self.connected_edge_on_left = edge + self.connected_edges.append(edge) + + def set_connection_right_edge(self, edge: "Edge"): + self.connected_edge_on_right = edge + self.connected_edges.append(edge) - def set_connection_right(self, node: "Node"): - self.connected_on_right = node - self.connected_nodes.append(node) + def remove_edge_to_node(self, node: "Node"): + """Removes the edge to the given node and removes the node from the connected_nodes list.""" + edge = self.get_edge_to_node(node) + self.connected_edges.remove(edge) - def get_possible_followers(self, source): - """Returns the Nodes that could follow (head, left, right) when comming from a source Node connected to this Node.""" + def get_edge_to_node(self, node): + """Returns the edge to the given neighbor node.""" + return next(edge for edge in self.connected_edges if edge.get_opposite_node(self) == node) + + def get_possible_followers(self, source: "Edge") -> List["Edge"]: + """Returns the `Edge`s that could follow (head, left, right) when comming from a source `Edge` connected to this `Node`.""" if source is None: - return self.connected_nodes + return self.connected_edges - if len(self.connected_nodes) <= 1: + if len(self.connected_edges) <= 1: return [] - if self.connected_on_head is None: - self.calc_anschluss_of_all_nodes() + if self.connected_edge_on_head is None: + self.calc_anschluss_of_all_edges() - if source.uuid == self.connected_on_head.uuid: - return [self.connected_on_left, self.connected_on_right] - else: - return [self.connected_on_head] + if source == self.connected_edge_on_head: + return [self.connected_edge_on_left, self.connected_edge_on_right] + return [self.connected_edge_on_head] + + + def get_anschluss_for_edge(self, edge: "Edge") -> NodeConnectionDirection: + """Gets the Anschluss (Links, Rechts, Spitze) of the edge on the current node. - def get_anschluss_of_other(self, other: "Node") -> NodeConnectionDirection: - """ Gets the Anschluss (Ende, Links, Rechts, Spitze) of other node. + :return: The node connection + """ + + if self.connected_edge_on_head is None: + self.calc_anschluss_of_all_edges() + + if self.connected_edge_on_head == edge: + return NodeConnectionDirection.Spitze + if self.connected_edge_on_left == edge: + return NodeConnectionDirection.Links + if self.connected_edge_on_right == edge: + return NodeConnectionDirection.Rechts + + return None + + def get_anschluss_of_other(self, other: "Node") -> List[NodeConnectionDirection]: + """Gets the Anschluss (Ende, Links, Rechts, Spitze) of other node. Idea: We assume, the current node is a point and we want to estimate the Anschluss of the other node. + + :return: A node might be connected to the same node via two or more connections. So we return a list of connections. """ if len(self.connected_nodes) != 3: @@ -91,23 +149,31 @@ def get_anschluss_of_other(self, other: "Node") -> NodeConnectionDirection: # TODO allow for different metrics to estimate the anschluss of the other nodes if not all([self.connected_on_left, self.connected_on_right, self.connected_on_head]): - self.calc_anschluss_of_all_nodes() + self.calc_anschluss_of_all_edges() + + result = [] if other.uuid == self.connected_on_head.uuid: - return NodeConnectionDirection.Spitze + result.append(NodeConnectionDirection.Spitze) if other.uuid == self.connected_on_left.uuid: - return NodeConnectionDirection.Links + result.append(NodeConnectionDirection.Links) if other.uuid == self.connected_on_right.uuid: - return NodeConnectionDirection.Rechts - return None + result.append(NodeConnectionDirection.Rechts) + + if len(result) == 0: + return None + + return result - def calc_anschluss_of_all_nodes(self): - """Calculates and sets the 'Anschluss' or connection side of the connected_nodes based on their geo-location.""" + def calc_anschluss_of_all_edges(self): + """Calculates and sets the 'Anschluss' or connection side of the connected_edges based on their geo-location.""" - def get_arc_between_nodes(_node_a: "Node", _node_b: "Node"): - _a = _node_a.geo_node.get_distance_to_other_geo_node(self.geo_node) - _b = self.geo_node.get_distance_to_other_geo_node(_node_b.geo_node) - _c = _node_a.geo_node.get_distance_to_other_geo_node(_node_b.geo_node) + def get_arc_between_edges(_edge_a: "Edge", _edge_b: "Edge"): + _neighbor_to_a = _edge_a.get_next_geo_node(self) + _neighbor_to_b = _edge_b.get_next_geo_node(self) + _a = _neighbor_to_a.get_distance_to_other_geo_node(self.geo_node) + _b = self.geo_node.get_distance_to_other_geo_node(_neighbor_to_b) + _c = _neighbor_to_a.get_distance_to_other_geo_node(_neighbor_to_b) return math.degrees(math.acos((_a * _a + _b * _b - _c * _c) / (2.0 * _a * _b))) @@ -115,41 +181,63 @@ def is_above_line_between_points(head_point: GeoPoint, branching_point: GeoPoint return ((branching_point.x - head_point.x)*(comparison_point.y - head_point.y) - (branching_point.y - head_point.y)*(comparison_point.x - head_point.x)) > 0 current_max_arc = 361 - other_a: "Node" = None - other_b: "Node" = None - for i in range(len(self.connected_nodes)): - for j in range(len(self.connected_nodes)): + other_a: "Edge" = None + other_b: "Edge" = None + for i in range(len(self.connected_edges)): + for j in range(len(self.connected_edges)): if i != j: - cur_arc = get_arc_between_nodes( - self.connected_nodes[i], self.connected_nodes[j] + cur_arc = get_arc_between_edges( + self.connected_edges[i], self.connected_edges[j] ) if cur_arc < current_max_arc: - missing_index = sum(range(len(self.connected_nodes))) - (i + j) - self.connected_on_head = self.connected_nodes[missing_index] - other_a = self.connected_nodes[i] - other_b = self.connected_nodes[j] + missing_index = sum(range(len(self.connected_edges))) - (i + j) + self.connected_edge_on_head = self.connected_edges[missing_index] + other_a = self.connected_edges[i] + other_b = self.connected_edges[j] current_max_arc = cur_arc + _neighbor_to_head = self.connected_edge_on_head.get_next_geo_node(self) + _neighbor_to_a = other_a.get_next_geo_node(self) ## <-+ what happens if other_a == other_b + _neighbor_to_b = other_b.get_next_geo_node(self) ## <-+ # Check on which side of the line between the head connection and this node the other nodes are - side_a = is_above_line_between_points(self.connected_on_head.geo_node.geo_point, self.geo_node.geo_point, other_a.geo_node.geo_point) - side_b = is_above_line_between_points(self.connected_on_head.geo_node.geo_point, self.geo_node.geo_point, other_b.geo_node.geo_point) + side_a = is_above_line_between_points( + _neighbor_to_head.geo_point, + self.geo_node.geo_point, + _neighbor_to_a.geo_point, + ) + side_b = is_above_line_between_points( + _neighbor_to_head.geo_point, + self.geo_node.geo_point, + _neighbor_to_b.geo_point, + ) # If they're on two separate sides we know which is left and right - if(side_a != side_b): - if (side_a): - self.connected_on_left, self.connected_on_right = other_a, other_b + if side_a != side_b: + if side_a: + self.connected_edge_on_left = other_a + self.connected_edge_on_right = other_b else: - self.connected_on_right, self.connected_on_left = other_a, other_b + self.connected_edge_on_left = other_b + self.connected_edge_on_right = other_a # If they're both above or below that line, we make the node that branches further away the left or right node, # depending on the side they're on (left if both above) else: - arc_a = get_arc_between_nodes(self.connected_on_head, other_a) - arc_b = get_arc_between_nodes(self.connected_on_head, other_b) - if(arc_a > arc_b): - self.connected_on_right, self.connected_on_left = (other_a, other_b) if side_a else (other_b, other_a) + arc_a = get_arc_between_edges(self.connected_edge_on_head, other_a) + arc_b = get_arc_between_edges(self.connected_edge_on_head, other_b) + if arc_a > arc_b: + self.connected_edge_on_left = ( + other_b if side_a else other_a + ) + self.connected_edge_on_right = ( + other_a if side_a else other_b + ) else: - self.connected_on_left, self.connected_on_right = (other_a, other_b) if side_a else (other_b, other_a) - + self.connected_edge_on_left = ( + other_a if side_a else other_b + ) + self.connected_edge_on_right = ( + other_b if side_a else other_a + ) def to_serializable(self): """See the description in the BaseElement class. @@ -160,10 +248,14 @@ def to_serializable(self): attributes = self.__dict__ references = { + "connected_edge_on_head": self.connected_edge_on_head.uuid if self.connected_edge_on_head else None, + "connected_edge_on_left": self.connected_edge_on_left.uuid if self.connected_edge_on_left else None, + "connected_edge_on_right": self.connected_edge_on_right.uuid if self.connected_edge_on_right else None, "connected_on_head": self.connected_on_head.uuid if self.connected_on_head else None, "connected_on_left": self.connected_on_left.uuid if self.connected_on_left else None, "connected_on_right": self.connected_on_right.uuid if self.connected_on_right else None, "connected_nodes": [node.uuid for node in self.connected_nodes], + "connected_edges": [edge.uuid for edge in self.connected_edges], "geo_node": self.geo_node.uuid if self.geo_node else None, } objects = {} diff --git a/yaramo/route.py b/yaramo/route.py index ffe3e46..fad775f 100644 --- a/yaramo/route.py +++ b/yaramo/route.py @@ -51,10 +51,9 @@ def get_edges_in_order(self): while previous_edge is not self.end_signal.edge: next_edge = None for edge in self.edges: - if edge.is_node_connected(next_node) and not edge.is_node_connected( - previous_edge.get_other_node(next_node) - ): + if edge.is_node_connected(next_node) and not edge == previous_edge: next_edge = edge + break edges_in_order.append(next_edge) next_node = next_edge.get_other_node(next_node)