From 8b6ab2c7be481f459e84d69e9b655c8d4f757f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Grimpard?= Date: Thu, 21 Mar 2024 14:23:13 +0100 Subject: [PATCH] Fix presence storm Fix : https://github.com/element-hq/synapse/issues/16843 Patch from : https://github.com/element-hq/synapse/issues/16843#issuecomment-2011836073 --- synapse/handlers/presence.py | 90 ++++++++++++++++++++++++++++-------- 1 file changed, 71 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 37ee625f717..cd32fc32c2d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -129,6 +129,10 @@ from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer +from collections import defaultdict, deque +import time + +user_device_to_updates = defaultdict(lambda: defaultdict(lambda: deque(maxlen=10))) if TYPE_CHECKING: from synapse.server import HomeServer @@ -1113,7 +1117,7 @@ async def bump_presence_active_time( prev_state = await self.current_state_for_user(user_id) new_fields: Dict[str, Any] = { "last_active_ts": now, - "state": _combine_device_states(devices.values()), + "state": _combine_device_states(user_id, devices.values()), } await self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -1402,7 +1406,7 @@ async def set_state( device_state.last_sync_ts = now # Based on the state of each user's device calculate the new presence state. - presence = _combine_device_states(devices.values()) + presence = _combine_device_states(user_id, devices.values()) new_fields = {"state": presence} @@ -2028,6 +2032,7 @@ def handle_timeouts( syncing_user_devices, user_to_devices.get(user_id, {}), now, + user_id ) if new_state: changes[state.user_id] = new_state @@ -2041,6 +2046,7 @@ def handle_timeout( syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]], user_devices: Dict[Optional[str], UserDevicePresenceState], now: int, + user_id: str ) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed @@ -2102,7 +2108,7 @@ def handle_timeout( # If the presence state of the devices changed, then (maybe) update # the user's overall presence state. if device_changed: - new_presence = _combine_device_states(user_devices.values()) + new_presence = _combine_device_states(user_id, user_devices.values()) if new_presence != state.state: state = state.copy_and_replace(state=new_presence) changed = True @@ -2227,35 +2233,81 @@ def handle_update( } +def _resolve_inconsistencies_for_device(device_updates: deque) -> str: + """ + Resolve potentially inconsistent state for a single device by examining its most recent presence updates. + + Args: + device_updates: A deque of (timestamp, presence state) tuples for the device. + + Returns: + The resolved presence state for the device. + """ + + # Ensure there are at least two updates to consider inconsistencies + if not device_updates: + return PresenceState.OFFLINE + if len(device_updates) < 2: + return device_updates[-1][1] + + # Extract unique states values + unique_states = {state for _, state in device_updates} + + # If there are non-consistent states within a polling timeframe, we reduce the inconsistencies + if len(unique_states) > 1: + highest_state = max(unique_states, key=lambda x: PRESENCE_BY_PRIORITY[x]) + return highest_state + + # If no inconsistencies, return the most recent state + return device_updates[-1][1] + + def _combine_device_states( + user_id: str, device_states: Iterable[UserDevicePresenceState], ) -> str: """ - Find the device to use presence information from. - - Orders devices by priority, then last_active_ts. + Determine the user's presence state by considering the last changes. + If changes are not cohesive, return the state with the highest priority among the last ones. Args: + user_id: The ID of the user associated to the devices (in case it's not set in the devices states) device_states: An iterable of device presence states Return: The combined presence state. """ - # Based on (all) the user's devices calculate the new presence state. + # No devices means: OFFLINE presence = PresenceState.OFFLINE - last_active_ts = -1 - - # Find the device to use the presence state of based on the presence priority, - # but tie-break with how recently the device has been seen. - for device_state in device_states: - if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > ( - PRESENCE_BY_PRIORITY[presence], - last_active_ts, - ): - presence = device_state.state - last_active_ts = device_state.last_active_ts - + highest_priority = 0 + if not list(device_states): + return presence + + device_to_updates = user_device_to_updates[user_id] + current_ts = int(time.time() * 1000) + threshold_ms = SYNC_ONLINE_TIMEOUT + 5 + + # Populate the map with the latest updates for each device + for state in sorted(device_states, key=lambda x: x.last_active_ts): + current_device_updates = device_to_updates[state.device_id] + # Remove updates that are older than 30000ms from the current timestamp + filtered_updates = [(ts, st) for ts, st in current_device_updates if current_ts - ts <= threshold_ms] + # Update the list directly within device_to_updates + device_to_updates[state.device_id] = filtered_updates + device_to_updates[state.device_id].append((state.last_active_ts, state.state)) + + # Resolve inconsistencies (e. g. multitab) for each device and collect the resolved states + resolved_states = [ + _resolve_inconsistencies_for_device(updates) for updates in device_to_updates.values() + ] + + # Combine the resolved device states into a single presence state for the user + for state in resolved_states: + if PRESENCE_BY_PRIORITY[state] > highest_priority: + presence = state + highest_priority = PRESENCE_BY_PRIORITY[state] + return presence