Skip to content

Commit

Permalink
Fix presence storm
Browse files Browse the repository at this point in the history
  • Loading branch information
agrimpard authored Mar 21, 2024
1 parent cf5adc8 commit 8b6ab2c
Showing 1 changed file with 71 additions and 19 deletions.
90 changes: 71 additions & 19 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from collections import defaultdict, deque
import time

user_device_to_updates = defaultdict(lambda: defaultdict(lambda: deque(maxlen=10)))

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -1113,7 +1117,7 @@ async def bump_presence_active_time(
prev_state = await self.current_state_for_user(user_id)
new_fields: Dict[str, Any] = {
"last_active_ts": now,
"state": _combine_device_states(devices.values()),
"state": _combine_device_states(user_id, devices.values()),
}

await self._update_states([prev_state.copy_and_replace(**new_fields)])
Expand Down Expand Up @@ -1402,7 +1406,7 @@ async def set_state(
device_state.last_sync_ts = now

# Based on the state of each user's device calculate the new presence state.
presence = _combine_device_states(devices.values())
presence = _combine_device_states(user_id, devices.values())

new_fields = {"state": presence}

Expand Down Expand Up @@ -2028,6 +2032,7 @@ def handle_timeouts(
syncing_user_devices,
user_to_devices.get(user_id, {}),
now,
user_id
)
if new_state:
changes[state.user_id] = new_state
Expand All @@ -2041,6 +2046,7 @@ def handle_timeout(
syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
user_devices: Dict[Optional[str], UserDevicePresenceState],
now: int,
user_id: str
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Expand Down Expand Up @@ -2102,7 +2108,7 @@ def handle_timeout(
# If the presence state of the devices changed, then (maybe) update
# the user's overall presence state.
if device_changed:
new_presence = _combine_device_states(user_devices.values())
new_presence = _combine_device_states(user_id, user_devices.values())
if new_presence != state.state:
state = state.copy_and_replace(state=new_presence)
changed = True
Expand Down Expand Up @@ -2227,35 +2233,81 @@ def handle_update(
}


def _resolve_inconsistencies_for_device(device_updates: deque) -> str:
"""
Resolve potentially inconsistent state for a single device by examining its most recent presence updates.
Args:
device_updates: A deque of (timestamp, presence state) tuples for the device.
Returns:
The resolved presence state for the device.
"""

# Ensure there are at least two updates to consider inconsistencies
if not device_updates:
return PresenceState.OFFLINE
if len(device_updates) < 2:
return device_updates[-1][1]

# Extract unique states values
unique_states = {state for _, state in device_updates}

# If there are non-consistent states within a polling timeframe, we reduce the inconsistencies
if len(unique_states) > 1:
highest_state = max(unique_states, key=lambda x: PRESENCE_BY_PRIORITY[x])
return highest_state

# If no inconsistencies, return the most recent state
return device_updates[-1][1]


def _combine_device_states(
user_id: str,
device_states: Iterable[UserDevicePresenceState],
) -> str:
"""
Find the device to use presence information from.
Orders devices by priority, then last_active_ts.
Determine the user's presence state by considering the last changes.
If changes are not cohesive, return the state with the highest priority among the last ones.
Args:
user_id: The ID of the user associated to the devices (in case it's not set in the devices states)
device_states: An iterable of device presence states
Return:
The combined presence state.
"""

# Based on (all) the user's devices calculate the new presence state.
# No devices means: OFFLINE
presence = PresenceState.OFFLINE
last_active_ts = -1

# Find the device to use the presence state of based on the presence priority,
# but tie-break with how recently the device has been seen.
for device_state in device_states:
if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
PRESENCE_BY_PRIORITY[presence],
last_active_ts,
):
presence = device_state.state
last_active_ts = device_state.last_active_ts

highest_priority = 0
if not list(device_states):
return presence

device_to_updates = user_device_to_updates[user_id]
current_ts = int(time.time() * 1000)
threshold_ms = SYNC_ONLINE_TIMEOUT + 5

# Populate the map with the latest updates for each device
for state in sorted(device_states, key=lambda x: x.last_active_ts):
current_device_updates = device_to_updates[state.device_id]
# Remove updates that are older than 30000ms from the current timestamp
filtered_updates = [(ts, st) for ts, st in current_device_updates if current_ts - ts <= threshold_ms]
# Update the list directly within device_to_updates
device_to_updates[state.device_id] = filtered_updates
device_to_updates[state.device_id].append((state.last_active_ts, state.state))

# Resolve inconsistencies (e. g. multitab) for each device and collect the resolved states
resolved_states = [
_resolve_inconsistencies_for_device(updates) for updates in device_to_updates.values()
]

# Combine the resolved device states into a single presence state for the user
for state in resolved_states:
if PRESENCE_BY_PRIORITY[state] > highest_priority:
presence = state
highest_priority = PRESENCE_BY_PRIORITY[state]

return presence


Expand Down

0 comments on commit 8b6ab2c

Please sign in to comment.