diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3af9fa31..24fc3353 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,7 @@ Added Fixed ======= - UI: fixed issue where non-JSON data was being parsed as JSON data. +- Fixed inconsistencies with link down behaviour. Flows and vlan reservations should now be properly cleared on link down. Changed ======= diff --git a/README.rst b/README.rst index 7c2e6f57..8f32cdf7 100644 --- a/README.rst +++ b/README.rst @@ -61,7 +61,6 @@ Subscribed - ``kytos/flow_manager.flow.removed`` - ``kytos/of_multi_table.enable_table`` - ``kytos/mef_eline.evc_affected_by_link_down`` -- ``kytos/mef_eline.cleanup_evcs_old_path`` - ``kytos/mef_eline.redeployed_link_up`` - ``kytos/mef_eline.redeployed_link_down`` - ``kytos/mef_eline.deployed`` @@ -105,17 +104,6 @@ Event reporting an error with redeploying a circuit with a link down event. "uni_z": evc.uni_z.as_dict() } -kytos/mef_eline.cleanup_evcs_old_path -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Event reporting the old circuit's path after a link down event. - -.. code-block:: python3 - - { - "evcs": evcs_with_failover + check_failover - } - kytos/mef_eline.evcs_affected_by_link_down ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/main.py b/main.py index 47dfbaeb..fb4b12f7 100755 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import time import traceback from collections import defaultdict +from contextlib import ExitStack from copy import deepcopy from threading import Lock from typing import Optional @@ -33,7 +34,7 @@ emit_event, get_vlan_tags_and_masks, map_evc_event_content, merge_flow_dicts, prepare_delete_flow, - send_flow_mods_event) + send_flow_mods_http) # pylint: disable=too-many-public-methods @@ -65,18 +66,19 @@ def setup(self): # dictionary of EVCs created. It acts as a circuit buffer. # Every create/update/delete must be synced to mongodb. - self.circuits = {} + self.circuits = dict[str, EVC]() self._intf_events = defaultdict(dict) self._lock_interfaces = defaultdict(Lock) self.table_group = {"epl": 0, "evpl": 0} self._lock = Lock() + self.multi_evc_lock = Lock() self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL) self.load_all_evcs() self._topology_updated_at = None - def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list: + def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]: """Get circuits sorted by desc service level and asc creation_time. In the future, as more ops are offloaded it should be get from the DB. @@ -823,85 +825,284 @@ def on_link_down(self, event): """Change circuit when link is down or under_mantenance.""" self.handle_link_down(event) + def prepare_swap_to_failover(self, evc: EVC): + """Prepare an evc for switching to failover.""" + install_flows = {} + try: + install_flows = evc.get_failover_flows() + evc.old_path = evc.current_path + evc.current_path = evc.failover_path + evc.failover_path = Path([]) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error( + "Ignore Failover path for " + f"{evc} due to error: {err}" + ) + return install_flows + + def execute_swap_to_failover(self, event_contents, install_flows): + """Process changes needed to commit a swap to failover.""" + emit_event( + self.controller, "failover_link_down", + content=deepcopy(event_contents) + ) + send_flow_mods_http( + install_flows, + "install" + ) + + def prepare_clear_old_path(self, evc: EVC): + """Prepare an evc for clearing the old path.""" + del_flows = {} + try: + del_flows = prepare_delete_flow( + merge_flow_dicts( + evc._prepare_uni_flows(evc.old_path, skip_in=True), + evc._prepare_nni_flows(evc.old_path) + ) + ) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error(f"Fail to remove {evc} old_path: {err}") + return del_flows + + def execute_clear_old_path( + self, + evcs: list[EVC], + event_contents, + delete_flows + ): + """Process changes needed to commit clearing the old path""" + send_flow_mods_http( + delete_flows, + 'delete' + ) + emit_event( + self.controller, + "failover_old_path", + content=event_contents + ) + for evc in evcs: + evc.old_path.make_vlans_available(self.controller) + evc.old_path = Path([]) + + def prepare_undeploy(self, evc: EVC): + """Prepare an evc for undeploying.""" + del_flows = {} + try: + del_flows = prepare_delete_flow( + merge_flow_dicts( + evc._prepare_uni_flows(evc.current_path, skip_in=True), + evc._prepare_nni_flows(evc.current_path), + evc._prepare_nni_flows(evc.failover_path) + ) + ) + # pylint: disable=broad-except + except Exception: + err = traceback.format_exc().replace("\n", ", ") + log.error(f"Fail to undeploy {evc}: {err}") + pass + return del_flows + + def execute_undeploy(self, evcs: list[EVC], remove_flows): + """Process changes needed to commit an undeploy""" + send_flow_mods_http( + remove_flows, + 'delete' + ) + + for evc in evcs: + evc.current_path.make_vlans_available(self.controller) + evc.failover_path.make_vlans_available(self.controller) + evc.current_path = Path([]) + evc.failover_path = Path([]) + evc.deactivate() + emit_event( + self.controller, + "need_redeploy", + content={"evc_id": evc.id} + ) + log.info(f"{evc} scheduled for redeploy") + # pylint: disable=too-many-branches # pylint: disable=too-many-locals def handle_link_down(self, event): """Change circuit when link is down or under_mantenance.""" link = event.content["link"] log.info("Event handle_link_down %s", link) - switch_flows = {} - evcs_with_failover = [] - evcs_normal = [] - check_failover = [] - failover_event_contents = {} - for evc in self.get_evcs_by_svc_level(): - with evc.lock: - if evc.is_affected_by_link(link): - evc.affected_by_link_at = event.timestamp - # if there is no failover path, handles link down the - # tradditional way - if ( + with ExitStack() as exit_stack: + exit_stack.enter_context(self.multi_evc_lock) + swap_to_failover = list[EVC]() + undeploy = list[EVC]() + clear_failover = list[EVC]() + clear_old_path = list[EVC]() + evc_dict = dict[str, EVC]() + evcs_to_update = list[EVC]() + + flow_modifications = defaultdict[str, dict[str, dict[list]]](dict) + event_contents = defaultdict[str, dict[str, dict]](dict) + + for evc in self.get_evcs_by_svc_level(): + evc_dict[evc.id] = evc + with ExitStack() as sub_stack: + sub_stack.enter_context(evc.lock) + if all(( + evc.is_affected_by_link(link), + evc.failover_path, + not evc.is_failover_path_affected_by_link(link) + )): + swap_to_failover.append(evc) + elif all(( + evc.is_affected_by_link(link), not evc.failover_path or evc.is_failover_path_affected_by_link(link) - ): - evcs_normal.append(evc) - continue - try: - dpid_flows = evc.get_failover_flows() - evc.old_path = evc.current_path - evc.current_path = evc.failover_path - evc.failover_path = Path([]) - # pylint: disable=broad-except - except Exception: - err = traceback.format_exc().replace("\n", ", ") - log.error( - "Ignore Failover path for " - f"{evc} due to error: {err}" - ) - evcs_normal.append(evc) + )): + undeploy.append(evc) + elif all(( + not evc.is_affected_by_link(link), + evc.failover_path, + evc.is_failover_path_affected_by_link(link) + )): + clear_failover.append(evc) + else: continue - for dpid, flows in dpid_flows.items(): - switch_flows.setdefault(dpid, []) - switch_flows[dpid].extend(flows) - evcs_with_failover.append(evc) - failover_event_contents[evc.id] = map_evc_event_content( - evc, - flows={k: v.copy() for k, v in switch_flows.items()} + + exit_stack.push(sub_stack.pop_all()) + + # Swap from current path to failover path + + for evc in swap_to_failover: + new_flows = self.prepare_swap_to_failover(evc) + if new_flows: + flow_modifications[evc.id]["swap_to_failover"] =\ + new_flows + event_contents[evc.id]["swap_to_failover"] =\ + map_evc_event_content( + evc, + flows=deepcopy(new_flows) ) - elif evc.is_failover_path_affected_by_link(link): - evc.old_path = evc.failover_path - evc.failover_path = Path([]) - check_failover.append(evc) + clear_old_path.append(evc) + else: + undeploy.append(evc) + + for evc in clear_failover: + evc.old_path = evc.failover_path + evc.failover_path = Path([]) + clear_old_path.append(evc) + + # Clear out old flows + + for evc in clear_old_path: + del_flows = self.prepare_clear_old_path(evc) + if del_flows: + flow_modifications[evc.id]["clear_old_path"] = del_flows + event_contents[evc.id]["clear_old_path"] =\ + map_evc_event_content( + evc, + current_path=evc.current_path.as_dict(), + removed_flows=deepcopy(del_flows) + ) + else: + undeploy.append(evc) + if evc.id in flow_modifications: + if "swap_to_failover" in flow_modifications[evc.id]: + evc.failover_path = evc.current_path + evc.current_path = evc.old_path + else: + evc.failover_path = evc.old_path + evc.old_path = Path([]) + del flow_modifications[evc.id] + del event_contents[evc.id] + + swap_to_failover_flows = {} + swap_to_failover_event_contents = {} + + clear_old_path_flows = {} + clear_old_path_event_contents = {} + + clear_old_path_reservations = list[EVC]() + + for modified_evc_id in flow_modifications: + evc = evc_dict[modified_evc_id] + if "swap_to_failover" in flow_modifications[modified_evc_id]: + swap_to_failover_flows = merge_flow_dicts( + swap_to_failover_flows, + flow_modifications[modified_evc_id]["swap_to_failover"] + ) + swap_to_failover_event_contents[modified_evc_id] =\ + event_contents[modified_evc_id]["swap_to_failover"] + if "clear_old_path" in flow_modifications[modified_evc_id]: + clear_old_path_flows = merge_flow_dicts( + clear_old_path_flows, + flow_modifications[modified_evc_id]["clear_old_path"] + ) + clear_old_path_event_contents[modified_evc_id] =\ + event_contents[modified_evc_id]["clear_old_path"] + clear_old_path_reservations.append(evc) + evcs_to_update.append(evc) + + if swap_to_failover_flows: + self.execute_swap_to_failover( + swap_to_failover_event_contents, + swap_to_failover_flows + ) - if failover_event_contents: - emit_event(self.controller, "failover_link_down", - content=deepcopy(failover_event_contents)) - send_flow_mods_event(self.controller, switch_flows, 'install') + if clear_old_path_flows: + self.execute_clear_old_path( + clear_old_path_reservations, + clear_old_path_event_contents, + clear_old_path_flows + ) - for evc in evcs_normal: - emit_event( - self.controller, - "evc_affected_by_link_down", - content={"link": link} | map_evc_event_content(evc) - ) + undeploy_flows = {} + undeploy_ready = list[EVC]() - evcs_to_update = [] - for evc in evcs_with_failover: - evcs_to_update.append(evc.as_dict()) - log.info( - f"{evc} redeployed with failover due to link down {link.id}" - ) - for evc in check_failover: - evcs_to_update.append(evc.as_dict()) + # Undeploy the evc, schedule a redeploy - self.mongo_controller.update_evcs(evcs_to_update) + for evc in undeploy: + del_flows = self.prepare_undeploy(evc) + if del_flows: + undeploy_flows = merge_flow_dicts( + undeploy_flows, + del_flows + ) + undeploy_ready.append(evc) - emit_event( - self.controller, - "cleanup_evcs_old_path", - content={"evcs": evcs_with_failover + check_failover} - ) + if undeploy_ready: + self.execute_undeploy(undeploy_ready, undeploy_flows) + evcs_to_update.extend(undeploy_ready) + + # Push update to DB + + if evcs_to_update: + self.mongo_controller.update_evcs( + [evc.as_dict() for evc in evcs_to_update] + ) + + @listen_to("kytos/mef_eline.need_redeploy") + def on_evc_need_redeploy(self, event): + """Redeploy evcs that need to be redeployed.""" + self.handle_evc_need_redeploy(event) + + def handle_evc_need_redeploy(self, event): + """Redeploy evcs that need to be redeployed.""" + evc = self.circuits.get(event.content["evc_id"]) + if evc is None: + return + with evc.lock: + if not evc.is_enabled() or evc.is_active(): + return + result = evc.deploy_to_path() + event_name = "error_redeploy_link_down" + if result: + log.info(f"{evc} redeployed") + event_name = "redeployed_link_down" + emit_event(self.controller, event_name, + content=map_evc_event_content(evc)) @listen_to("kytos/mef_eline.evc_affected_by_link_down") def on_evc_affected_by_link_down(self, event): @@ -937,50 +1138,6 @@ def handle_evc_deployed(self, event): return evc.try_setup_failover_path() - @listen_to("kytos/mef_eline.cleanup_evcs_old_path") - def on_cleanup_evcs_old_path(self, event): - """Handle cleanup evcs old path.""" - self.handle_cleanup_evcs_old_path(event) - - def handle_cleanup_evcs_old_path(self, event): - """Handle cleanup evcs old path.""" - evcs = event.content.get("evcs", []) - event_contents: dict[str, dict] = defaultdict(list) - total_flows = {} - for evc in evcs: - if not evc.old_path: - continue - with evc.lock: - removed_flows = {} - try: - nni_flows = prepare_delete_flow( - evc._prepare_nni_flows(evc.old_path) - ) - uni_flows = prepare_delete_flow( - evc._prepare_uni_flows(evc.old_path, skip_in=True) - ) - removed_flows = merge_flow_dicts( - nni_flows, uni_flows - ) - # pylint: disable=broad-except - except Exception: - err = traceback.format_exc().replace("\n", ", ") - log.error(f"Fail to remove {evc} old_path: {err}") - continue - if removed_flows: - total_flows = merge_flow_dicts(total_flows, removed_flows) - content = map_evc_event_content( - evc, - removed_flows=deepcopy(removed_flows), - current_path=evc.current_path.as_dict(), - ) - event_contents[evc.id] = content - evc.old_path = Path([]) - if event_contents: - send_flow_mods_event(self.controller, total_flows, 'delete') - emit_event(self.controller, "failover_old_path", - content=event_contents) - @listen_to("kytos/topology.topology_loaded") def on_topology_loaded(self, event): # pylint: disable=unused-argument """Load EVCs once the topology is available.""" diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 4adbc10a..eb8b3ed3 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -11,7 +11,7 @@ from kytos.core.exceptions import KytosTagError from kytos.core.interface import TAGRange, UNI, Interface from napps.kytos.mef_eline.exceptions import InvalidPath -from napps.kytos.mef_eline.models import EVC +from napps.kytos.mef_eline.models import EVC, Path from napps.kytos.mef_eline.tests.helpers import get_uni_mocked @@ -1900,153 +1900,189 @@ def test_handle_link_up(self): evc_mock.handle_link_up.assert_called_with("abc") @patch("time.sleep", return_value=None) - @patch("napps.kytos.mef_eline.utils.emit_event") + @patch("napps.kytos.mef_eline.main.map_evc_event_content") + @patch("napps.kytos.mef_eline.main.send_flow_mods_http") @patch("napps.kytos.mef_eline.main.emit_event") def test_handle_link_down( - self, emit_main_mock, emit_utils_mock, _ + self, + emit_main_mock: MagicMock, + send_flow_mods_mock: MagicMock, + map_evc_mock: MagicMock, + _: MagicMock ): """Test handle_link_down method.""" uni = create_autospec(UNI) evc1 = MagicMock(id="1", service_level=0, creation_time=1, metadata="mock", _active="true", _enabled="true", uni_a=uni, uni_z=uni) - evc1.name = "name" evc1.is_affected_by_link.return_value = True evc1.handle_link_down.return_value = True - evc1.failover_path = None + evc1.failover_path = Path([]) + evc1.as_dict.return_value = {"id": "1"} + evc2 = MagicMock(id="2", service_level=6, creation_time=1) evc2.is_affected_by_link.return_value = False evc2.is_failover_path_affected_by_link.return_value = True evc2.as_dict.return_value = {"id": "2"} + evc3 = MagicMock(id="3", service_level=5, creation_time=1, metadata="mock", _active="true", _enabled="true", uni_a=uni, uni_z=uni) - evc3.name = "name" evc3.is_affected_by_link.return_value = True evc3.handle_link_down.return_value = True - evc3.failover_path = None + evc3.failover_path = Path([]) + evc3.as_dict.return_value = {"id": "3"} + evc4 = MagicMock(id="4", service_level=4, creation_time=1, metadata="mock", _active="true", _enabled="true", uni_a=uni, uni_z=uni) - evc4.name = "name" evc4.is_affected_by_link.return_value = True evc4.is_failover_path_affected_by_link.return_value = False - evc4.failover_path = ["2"] evc4.get_failover_flows.return_value = { "2": ["flow1", "flow2"], "3": ["flow3", "flow4", "flow5", "flow6"], } evc4.as_dict.return_value = {"id": "4"} + evc5 = MagicMock(id="5", service_level=7, creation_time=1) evc5.is_affected_by_link.return_value = True evc5.is_failover_path_affected_by_link.return_value = False - evc5.failover_path = ["3"] evc5.get_failover_flows.return_value = { "4": ["flow7", "flow8"], "5": ["flow9", "flow10"], } evc5.as_dict.return_value = {"id": "5"} + evc6 = MagicMock(id="6", service_level=8, creation_time=1, metadata="mock", _active="true", _enabled="true", uni_a=uni, uni_z=uni) - evc6.name = "name" evc6.is_affected_by_link.return_value = True evc6.is_failover_path_affected_by_link.return_value = False - evc6.failover_path = ["3"] evc6.get_failover_flows.side_effect = AttributeError("err") + evc6.as_dict.return_value = {"id": "6"} link = MagicMock(id="123") event = KytosEvent(name="test", content={"link": link}) - self.napp.circuits = {"1": evc1, "2": evc2, "3": evc3, "4": evc4, - "5": evc5, "6": evc6} + + self.napp.prepare_clear_old_path = { + evc1: {"1": ["clear_flow1"]}, + evc2: {"2": ["clear_flow2"]}, + evc3: {"3": ["clear_flow3"]}, + evc4: {"4": ["clear_flow4"]}, + evc5: {"5": ["clear_flow5"]}, + evc6: {"6": ["clear_flow6"]}, + }.get + + self.napp.prepare_undeploy = { + evc1: {"1": ["undeploy_flow1"]}, + evc2: {"2": ["undeploy_flow2"]}, + evc3: {"3": ["undeploy_flow3"]}, + evc4: {"4": ["undeploy_flow4"]}, + evc5: {"5": ["undeploy_flow5"]}, + evc6: {"6": ["undeploy_flow6"]}, + }.get + + def side_effect(evc, **_): + return "event_content" + evc.id + + map_evc_mock.side_effect = side_effect + + self.napp.circuits = { + "1": evc1, + "2": evc2, + "3": evc3, + "4": evc4, + "5": evc5, + "6": evc6 + } + self.napp.handle_link_down(event) assert evc5.service_level > evc4.service_level # evc5 batched flows should be sent first - emit_utils_mock.assert_has_calls([ - call( - self.napp.controller, - context="kytos.flow_manager", - name="flows.install", - content={ - "dpid": "4", - "flow_dict": {"flows": ["flow7", "flow8"]}, - 'force': True, - } - ), - call( - self.napp.controller, - context="kytos.flow_manager", - name="flows.install", - content={ - "dpid": "5", - "flow_dict": {"flows": ["flow9", "flow10"]}, - 'force': True, - } - ), - call( - self.napp.controller, - context="kytos.flow_manager", - name="flows.install", - content={ - "dpid": "2", - "flow_dict": {"flows": ["flow1", "flow2"]}, - 'force': True, - } - ), - call( - self.napp.controller, - context="kytos.flow_manager", - name="flows.install", - content={ - "dpid": "3", - "flow_dict": { - "flows": ["flow3", "flow4", "flow5", "flow6"] + send_flow_mods_mock.assert_has_calls( + [ + call( + { + "2": ["flow1", "flow2"], + "3": ["flow3", "flow4", "flow5", "flow6"], + "4": ["flow7", "flow8"], + "5": ["flow9", "flow10"], }, - 'force': True, - } - ) - ]) - event_name = "evc_affected_by_link_down" + "install" + ), + call( + { + "2": ["clear_flow2"], + "4": ["clear_flow4"], + "5": ["clear_flow5"], + }, + "delete" + ), + call( + { + "3": ["undeploy_flow3"], + "1": ["undeploy_flow1"], + "6": ["undeploy_flow6"], + }, + "delete" + ) + ] + ) + + emit_main_mock.assert_has_calls( + [ + call( + self.napp.controller, + "failover_link_down", + content={ + "4": "event_content4", + "5": "event_content5", + } + ), + call( + self.napp.controller, + "failover_old_path", + content={ + "2": "event_content2", + "4": "event_content4", + "5": "event_content5", + } + ), + call( + self.napp.controller, + "need_redeploy", + content={ + "evc_id": "1" + } + ), + call( + self.napp.controller, + "need_redeploy", + content={ + "evc_id": "3" + } + ), + call( + self.napp.controller, + "need_redeploy", + content={ + "evc_id": "6" + } + ), + ], + any_order=True + ) + assert evc3.service_level > evc1.service_level # evc3 should be handled before evc1 - emit_main_mock.assert_has_calls([ - call(self.napp.controller, event_name, content={ - "link": link, - "id": "6", - "evc_id": "6", - "name": "name", - "metadata": "mock", - "active": "true", - "enabled": "true", - "uni_a": uni.as_dict(), - "uni_z": uni.as_dict(), - }), - call(self.napp.controller, event_name, content={ - "link": link, - "evc_id": "3", - "id": "3", - "name": "name", - "metadata": "mock", - "active": "true", - "enabled": "true", - "uni_a": uni.as_dict(), - "uni_z": uni.as_dict(), - }), - call(self.napp.controller, event_name, content={ - "link": link, - "evc_id": "1", - "id": "1", - "name": "name", - "metadata": "mock", - "active": "true", - "enabled": "true", - "uni_a": uni.as_dict(), - "uni_z": uni.as_dict(), - }), - ]) - self.napp.mongo_controller.update_evcs.assert_called_with( - [{"id": "5"}, {"id": "4"}, {"id": "2"}] - ) + + expected_updates = {"1", "2", "3", "4", "5", "6"} + args = self.napp.mongo_controller.update_evcs.call_args.args + for update in args[0]: + expected_updates.remove(update["id"]) + + assert not expected_updates + event_name = "failover_link_down" assert emit_main_mock.call_args_list[0][0][1] == event_name @@ -2112,64 +2148,6 @@ def test_handle_evc_affected_by_link_down(self, emit_event_mock): } ) - def test_cleanup_evcs_old_path(self, monkeypatch): - """Test handle_cleanup_evcs_old_path method.""" - current_path, map_evc_content, emit_event = [ - MagicMock(), MagicMock(), MagicMock() - ] - send_flows, merge_flows, create_flows = [ - MagicMock(), MagicMock(), MagicMock() - ] - monkeypatch.setattr( - "napps.kytos.mef_eline.main.map_evc_event_content", - map_evc_content - ) - monkeypatch.setattr( - "napps.kytos.mef_eline.main.emit_event", - emit_event - ) - monkeypatch.setattr( - "napps.kytos.mef_eline.main.send_flow_mods_event", - send_flows - ) - monkeypatch.setattr( - "napps.kytos.mef_eline.main.merge_flow_dicts", - merge_flows - ) - monkeypatch.setattr( - "napps.kytos.mef_eline.main.prepare_delete_flow", - create_flows - ) - merge_flows.return_value = ['1', '2'] - evc1 = create_autospec(EVC, id="1", old_path=["1"], - current_path=current_path, lock=MagicMock()) - evc2 = create_autospec(EVC, id="2", old_path=["2"], - current_path=current_path, lock=MagicMock()) - evc3 = create_autospec(EVC, id="3", old_path=[], - current_path=[], lock=MagicMock()) - - event = KytosEvent(name="e1", content={"evcs": [evc1, evc2, evc3]}) - assert evc1.old_path - assert evc2.old_path - self.napp.handle_cleanup_evcs_old_path(event) - evc1._prepare_nni_flows.assert_called_with(["1"]) - evc1._prepare_uni_flows.assert_called_with(["1"], skip_in=True) - evc2._prepare_nni_flows.assert_called_with(["2"]) - evc2._prepare_uni_flows.assert_called_with(["2"], skip_in=True) - evc3._prepare_nni_flows.assert_not_called() - evc3._prepare_uni_flows.assert_not_called() - assert create_flows.call_count == 4 - assert emit_event.call_count == 1 - assert emit_event.call_args[0][1] == "failover_old_path" - assert map_evc_content.call_args[1]['removed_flows'] == ['1', '2'] - assert len(emit_event.call_args[1]["content"]) == 2 - assert send_flows.call_count == 1 - assert send_flows.call_args[0][1] == ['1', '2'] - assert send_flows.call_args[0][2] == 'delete' - assert merge_flows.call_count == 4 - assert not evc1.old_path - assert not evc2.old_path - async def test_add_metadata(self): """Test method to add metadata""" self.napp.controller.loop = asyncio.get_running_loop() diff --git a/utils.py b/utils.py index 32db4cbc..ab88fa80 100644 --- a/utils.py +++ b/utils.py @@ -1,10 +1,13 @@ """Utility functions.""" from typing import Union +import httpx + from kytos.core.common import EntityStatus from kytos.core.events import KytosEvent from kytos.core.interface import UNI, Interface, TAGRange -from napps.kytos.mef_eline.exceptions import DisabledSwitch +from napps.kytos.mef_eline import settings +from napps.kytos.mef_eline.exceptions import DisabledSwitch, FlowModException def map_evc_event_content(evc, **kwargs) -> dict: @@ -28,7 +31,7 @@ def emit_event(controller, name, context="kytos/mef_eline", content=None, def merge_flow_dicts( - dst: dict[str, list], *srcs: list[dict[str, list]] + dst: dict[str, list], *srcs: dict[str, list] ) -> dict[str, list]: """Merge srcs dict flows into dst.""" for src in srcs: @@ -179,6 +182,40 @@ def send_flow_mods_event( ) +def send_flow_mods_http( + flow_dict: dict[str, list], + action: str, force=True +): + """ + Send a flow_mod list to a specific switch. + + Args: + dpid(str): The target of flows (i.e. Switch.id). + flow_mods(dict): Python dictionary with flow_mods. + command(str): By default is 'flows'. To remove a flow is 'remove'. + force(bool): True to send via consistency check in case of errors. + by_switch(bool): True to send to 'flows_by_switch' request instead. + """ + endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}" + + formatted_dict = { + dpid: {"flows": flows} + for (dpid, flows) in flow_dict.items() + } + + try: + if action == "install": + res = httpx.post(endpoint, json=formatted_dict, timeout=30) + elif action == "delete": + res = httpx.request( + "DELETE", endpoint, json=formatted_dict, timeout=30 + ) + except httpx.RequestError as err: + raise FlowModException(str(err)) from err + if res.is_server_error or res.status_code >= 400: + raise FlowModException(res.text) + + def prepare_delete_flow(evc_flows: dict[str, list[dict]]): """Create flow mods suited for flow deletion.""" dpid_flows: dict[str, list[dict]] = {}