Skip to content

Commit

Permalink
Fix up logic for delaying sending read receipts over federation. (#17933
Browse files Browse the repository at this point in the history
)

For context of why we delay read receipts, see
matrix-org/synapse#4730.

Element Web often sends read receipts in quick succession, if it reloads
the timeline it'll send one for the last message in the old timeline and
again for the last message in the new timeline. This caused remote users
to see a read receipt for older messages come through quickly, but then
the second read receipt taking a while to arrive for the most recent
message.

There are two things going on in this PR:
1. There was a mismatch between seconds and milliseconds, and so we
ended up delaying for far longer than intended.
2. Changing the logic to reuse the `DestinationWakeupQueue` (used for
presence)

The changes in logic are:
- Treat the first receipt and subsequent receipts in a room in the same
way
- Whitelist certain classes of receipts to never delay being sent, i.e.
receipts in small rooms, receipts for events that were sent within the
last 60s, and sending receipts to the event sender's server.
- The maximum delay a receipt can have before being sent to a server is
30s, and we'll send out receipts to remotes at least at 50Hz (by
default)

The upshot is that this should make receipts feel more snappy over
federation.

This new logic should send roughly between 10%–20% of transactions
immediately on matrix.org.
  • Loading branch information
erikjohnston authored Nov 25, 2024
1 parent 93cc955 commit 3943d2f
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 134 deletions.
1 change: 1 addition & 0 deletions changelog.d/17933.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix long-standing bug where read receipts could get overly delayed being sent over federation.
165 changes: 88 additions & 77 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@
Iterable,
List,
Optional,
Set,
Tuple,
)

Expand Down Expand Up @@ -170,7 +169,13 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
from synapse.types import (
JsonDict,
ReadReceipt,
RoomStreamToken,
StrCollection,
get_domain_from_id,
)
from synapse.util import Clock
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
Expand Down Expand Up @@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
# being woken up.
_MAX_TIME_IN_QUEUE = 30.0

# The maximum duration in seconds between waking up consecutive destination
# queues.
_MAX_DELAY = 0.1

sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib()
max_delay_s: int = attr.ib()

queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False)

Expand Down Expand Up @@ -332,7 +335,7 @@ async def _handle(self) -> None:
# We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it.
current_sleep_seconds = min(
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
)

while self.queue:
Expand Down Expand Up @@ -416,19 +419,14 @@ def __init__(self, hs: "HomeServer"):
self._is_processing = False
self._last_poked_id = -1

# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._external_cache = hs.get_external_cache()

self._rr_txn_interval_per_room_ms = (
1000.0
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
rr_txn_interval_per_room_s = (
1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
self._destination_wakeup_queue = _DestinationWakeupQueue(
self, self.clock, max_delay_s=rr_txn_interval_per_room_s
)

self._external_cache = hs.get_external_cache()
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

# Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now(
Expand Down Expand Up @@ -745,37 +743,48 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:

# Some background on the rate-limiting going on here.
#
# It turns out that if we attempt to send out RRs as soon as we get them from
# a client, then we end up trying to do several hundred Hz of federation
# transactions. (The number of transactions scales as O(N^2) on the size of a
# room, since in a large room we have both more RRs coming in, and more servers
# to send them to.)
# It turns out that if we attempt to send out RRs as soon as we get them
# from a client, then we end up trying to do several hundred Hz of
# federation transactions. (The number of transactions scales as O(N^2)
# on the size of a room, since in a large room we have both more RRs
# coming in, and more servers to send them to.)
#
# This leads to a lot of CPU load, and we end up getting behind. The solution
# currently adopted is as follows:
# This leads to a lot of CPU load, and we end up getting behind. The
# solution currently adopted is to differentiate between receipts and
# destinations we should immediately send to, and those we can trickle
# the receipts to.
#
# The first receipt in a given room is sent out immediately, at time T0. Any
# further receipts are, in theory, batched up for N seconds, where N is calculated
# based on the number of servers in the room to achieve a transaction frequency
# of around 50Hz. So, for example, if there were 100 servers in the room, then
# N would be 100 / 50Hz = 2 seconds.
# The current logic is to send receipts out immediately if:
# - the room is "small", i.e. there's only N servers to send receipts
# to, and so sending out the receipts immediately doesn't cause too
# much load; or
# - the receipt is for an event that happened recently, as users
# notice if receipts are delayed when they know other users are
# currently reading the room; or
# - the receipt is being sent to the server that sent the event, so
# that users see receipts for their own receipts quickly.
#
# Then, after T+N, we flush out any receipts that have accumulated, and restart
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
# we stop the cycle and go back to the start.
# For destinations that we should delay sending the receipt to, we queue
# the receipts up to be sent in the next transaction, but don't trigger
# a new transaction to be sent. We then add the destination to the
# `DestinationWakeupQueue`, which will slowly iterate over each
# destination and trigger a new transaction to be sent.
#
# However, in practice, it is often possible to flush out receipts earlier: in
# particular, if we are sending a transaction to a given server anyway (for
# example, because we have a PDU or a RR in another room to send), then we may
# as well send out all of the pending RRs for that server. So it may be that
# by the time we get to T+N, we don't actually have any RRs left to send out.
# Nevertheless we continue to buffer up RRs for the room in question until we
# reach the point that no RRs arrive between timer ticks.
# However, in practice, it is often possible to send out delayed
# receipts earlier: in particular, if we are sending a transaction to a
# given server anyway (for example, because we have a PDU or a RR in
# another room to send), then we may as well send out all of the pending
# RRs for that server. So it may be that by the time we get to waking up
# the destination, we don't actually have any RRs left to send out.
#
# For even more background, see https://github.com/matrix-org/synapse/issues/4730.
# For even more background, see
# https://github.com/matrix-org/synapse/issues/4730.

room_id = receipt.room_id

# Local read receipts always have 1 event ID.
event_id = receipt.event_ids[0]

# Work out which remote servers should be poked and poke them.
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id
Expand All @@ -797,49 +806,51 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
if not domains:
return

queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
# We now split which domains we want to wake up immediately vs which we
# want to delay waking up.
immediate_domains: StrCollection
delay_domains: StrCollection

# if there is no flush yet scheduled, we will send out these receipts with
# immediate flushes, and schedule the next flush for this room.
if queues_pending_flush is not None:
logger.debug("Queuing receipt for: %r", domains)
if len(domains) < 10:
# For "small" rooms send to all domains immediately
immediate_domains = domains
delay_domains = ()
else:
logger.debug("Sending receipt to: %r", domains)
self._schedule_rr_flush_for_room(room_id, len(domains))
metadata = await self.store.get_metadata_for_event(
receipt.room_id, event_id
)
assert metadata is not None

for domain in domains:
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
sender_domain = get_domain_from_id(metadata.sender)

# if there is already a RR flush pending for this room, then make sure this
# destination is registered for the flush
if queues_pending_flush is not None:
queues_pending_flush.add(queue)
if self.clock.time_msec() - metadata.received_ts < 60_000:
# We always send receipts for recent messages immediately
immediate_domains = domains
delay_domains = ()
else:
queue.flush_read_receipts_for_room(room_id)

def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains

logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()

def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)

if not queues:
# no more RRs arrived for this room; we are done.
return
# Otherwise, we delay waking up all destinations except for the
# sender's domain.
immediate_domains = []
delay_domains = []
for domain in domains:
if domain == sender_domain:
immediate_domains.append(domain)
else:
delay_domains.append(domain)

for domain in immediate_domains:
# Add to destination queue and wake the destination up
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
queue.attempt_new_transaction()

# schedule the next flush
self._schedule_rr_flush_for_room(room_id, len(queues))
for domain in delay_domains:
# Add to destination queue...
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)

for queue in queues:
queue.flush_read_receipts_for_room(room_id)
# ... and schedule the destination to be woken up.
self._destination_wakeup_queue.add_to_queue(domain)

async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
Expand Down
25 changes: 3 additions & 22 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def __init__(
# Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False

# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
Expand Down Expand Up @@ -258,15 +257,7 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
}
)

def flush_read_receipts_for_room(self, room_id: str) -> None:
# If there are any pending receipts for this room then force-flush them
# in a new transaction.
for edu in self._pending_receipt_edus:
if room_id in edu:
self._rrs_pending_flush = True
self.attempt_new_transaction()
# No use in checking remaining EDUs if the room was found.
break
self.mark_new_data()

def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
Expand Down Expand Up @@ -603,12 +594,9 @@ async def _catch_up_transmission_loop(self) -> None:
self._destination, last_successful_stream_ordering
)

def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return

# Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]:
Expand Down Expand Up @@ -747,7 +735,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
)

# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
pending_edus.extend(self.queue._get_receipt_edus(limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)

# Next, prioritize to-device messages so that existing encryption channels
Expand Down Expand Up @@ -795,13 +783,6 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
if not self._pdus and not pending_edus:
return [], []

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if edu_limit:
pending_edus.extend(
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
)

if self._pdus:
self._last_stream_ordering = self._pdus[
-1
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id))

self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))

Expand Down Expand Up @@ -446,6 +447,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)

self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,))
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ class _EventRow:
outlier: bool


@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventMetadata:
"""Event metadata returned by `get_metadata_for_event(..)`"""

sender: str
received_ts: int


class EventRedactBehaviour(Enum):
"""
What to do when retrieving a redacted event from the database.
Expand Down Expand Up @@ -2580,3 +2588,22 @@ async def have_finished_sliding_sync_background_jobs(self) -> bool:
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)

@cached(tree=True)
async def get_metadata_for_event(
self, room_id: str, event_id: str
) -> Optional[EventMetadata]:
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
retcols=("sender", "received_ts"),
allow_none=True,
desc="get_metadata_for_event",
)
if row is None:
return None

return EventMetadata(
sender=row[0],
received_ts=row[1],
)
Loading

0 comments on commit 3943d2f

Please sign in to comment.