diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 2f701cc01..aeb8135ba 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -1500,6 +1500,22 @@ def get_host_lane_assignment_option_side_effect(app): appl = get_cmis_application_desired(mock_xcvr_api, host_lane_count, speed) assert task.get_cmis_host_lanes_mask(mock_xcvr_api, appl, host_lane_count, subport) == expected + @pytest.mark.parametrize("lport, freq, grid, expected", [ + (1, 193100, 75, True), + (1, 193100, 100, False), + (1, 193125, 75, False), + (1, 193100, 25, False) + ]) + def test_CmisManagerTask_verify_config_laser_frequency(self, lport, freq, grid, expected): + mock_xcvr_api = MagicMock() + mock_xcvr_api.get_supported_freq_config = MagicMock() + mock_xcvr_api.get_supported_freq_config.return_value = (0x80, 0, 0, 191300, 196100) + + port_mapping = PortMapping() + stop_event = threading.Event() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + result = task.verify_config_laser_frequency(mock_xcvr_api, lport, freq, grid) + assert result == expected def test_CmisManagerTask_post_port_active_apsel_to_db(self): mock_xcvr_api = MagicMock() @@ -1602,6 +1618,7 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): mock_xcvr_api.get_tx_config_power = MagicMock(return_value=0) mock_xcvr_api.get_laser_config_freq = MagicMock(return_value=0) mock_xcvr_api.get_module_type_abbreviation = MagicMock(return_value='QSFP-DD') + mock_xcvr_api.get_supported_freq_config = MagicMock(return_value=(0xA0,0,0,191300,196100)) mock_xcvr_api.get_datapath_init_duration = MagicMock(return_value=60000.0) mock_xcvr_api.get_module_pwr_up_duration = MagicMock(return_value=70000.0) mock_xcvr_api.get_datapath_deinit_duration = MagicMock(return_value=600000.0) diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index e0624edfa..f15eb05ea 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1157,18 +1157,40 @@ def configure_tx_output_power(self, api, lport, tx_power): self.log_error("{} configured tx power {} > maximum power {} supported".format(lport, tx_power, max_p)) return api.set_tx_power(tx_power) - def configure_laser_frequency(self, api, lport, freq, grid=75): - _, _, _, lowf, highf = api.get_supported_freq_config() + # Verify if the configured frequency is valid or supported + # Now it only checks for grids 75 and 100. This needs to be implemented when more grids are supported. + def verify_config_laser_frequency(self, api, lport, freq, grid=75): + grid_supported, _, _, lowf, highf = api.get_supported_freq_config() if freq < lowf: self.log_error("{} configured freq:{} GHz is lower than the supported freq:{} GHz".format(lport, freq, lowf)) + return False if freq > highf: self.log_error("{} configured freq:{} GHz is higher than the supported freq:{} GHz".format(lport, freq, highf)) - chan = int(round((freq - 193100)/25)) - if chan % 3 != 0: - self.log_error("{} configured freq:{} GHz is NOT in 75GHz grid".format(lport, freq)) + return False + if grid == 75: + if (grid_supported >> 7) & 0x1 != 1: + self.log_error("{} 75GHz is not supported".format(lport)) + return False + chan = int(round((freq - 193100)/25)) + if chan % 3 != 0: + self.log_error("{} configured freq:{} GHz is NOT in 75GHz grid".format(lport, freq)) + return False + elif grid == 100: + if (grid_supported >> 5) & 0x1 != 1: + self.log_error("{} 100GHz is not supported".format(lport)) + return False + else: + self.log_error("{} {}GHz is not supported".format(lport, grid)) + return False + return True + + def configure_laser_frequency(self, api, lport, freq, grid=75): if api.get_tuning_in_progress(): self.log_error("{} Tuning in progress, subport selection may fail!".format(lport)) - return api.set_laser_freq(freq, grid) + try: + return api.set_laser_freq(freq, grid) + except: + return False def post_port_active_apsel_to_db(self, api, lport, host_lanes_mask): try: @@ -1391,11 +1413,15 @@ def task_worker(self): # For ZR module, Datapath needes to be re-initlialized on new channel selection if api.is_coherent_module(): - freq = self.port_dict[lport]['laser_freq'] - # If user requested frequency is NOT the same as configured on the module - # force datapath re-initialization - if 0 != freq and freq != api.get_laser_config_freq(): - need_update = True + freq = self.port_dict[lport]['laser_freq'] + # If user requested frequency is NOT the same as configured on the module + # force datapath re-initialization + if 0 != freq and freq != api.get_laser_config_freq(): + if self.verify_config_laser_frequency(api, lport, freq) == True: + need_update = True + else: + # clear the invalid freq to prevent setting in AP_CONFIGURED + self.port_dict[lport]['laser_freq'] = 0 if not need_update: # No application updates