diff --git a/changelog.d/17933.bugfix b/changelog.d/17933.bugfix new file mode 100644 index 00000000000..8d30ac587eb --- /dev/null +++ b/changelog.d/17933.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where read receipts could get overly delayed being sent over federation. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 18884808815..17cddf18a38 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -140,7 +140,6 @@ Iterable, List, Optional, - Set, Tuple, ) @@ -170,7 +169,13 @@ run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection +from synapse.types import ( + JsonDict, + ReadReceipt, + RoomStreamToken, + StrCollection, + get_domain_from_id, +) from synapse.util import Clock from synapse.util.metrics import Measure from synapse.util.retryutils import filter_destinations_by_retry_limiter @@ -297,12 +302,10 @@ class _DestinationWakeupQueue: # being woken up. _MAX_TIME_IN_QUEUE = 30.0 - # The maximum duration in seconds between waking up consecutive destination - # queues. - _MAX_DELAY = 0.1 - sender: "FederationSender" = attr.ib() clock: Clock = attr.ib() + max_delay_s: int = attr.ib() + queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) processing: bool = attr.ib(default=False) @@ -332,7 +335,7 @@ async def _handle(self) -> None: # We also add an upper bound to the delay, to gracefully handle the # case where the queue only has a few entries in it. current_sleep_seconds = min( - self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) + self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue) ) while self.queue: @@ -416,19 +419,14 @@ def __init__(self, hs: "HomeServer"): self._is_processing = False self._last_poked_id = -1 - # map from room_id to a set of PerDestinationQueues which we believe are - # awaiting a call to flush_read_receipts_for_room. The presence of an entry - # here for a given room means that we are rate-limiting RR flushes to that room, - # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} + self._external_cache = hs.get_external_cache() - self._rr_txn_interval_per_room_ms = ( - 1000.0 - / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + rr_txn_interval_per_room_s = ( + 1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + ) + self._destination_wakeup_queue = _DestinationWakeupQueue( + self, self.clock, max_delay_s=rr_txn_interval_per_room_s ) - - self._external_cache = hs.get_external_cache() - self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) # Regularly wake up destinations that have outstanding PDUs to be caught up self.clock.looping_call_now( @@ -745,37 +743,48 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: # Some background on the rate-limiting going on here. # - # It turns out that if we attempt to send out RRs as soon as we get them from - # a client, then we end up trying to do several hundred Hz of federation - # transactions. (The number of transactions scales as O(N^2) on the size of a - # room, since in a large room we have both more RRs coming in, and more servers - # to send them to.) + # It turns out that if we attempt to send out RRs as soon as we get them + # from a client, then we end up trying to do several hundred Hz of + # federation transactions. (The number of transactions scales as O(N^2) + # on the size of a room, since in a large room we have both more RRs + # coming in, and more servers to send them to.) # - # This leads to a lot of CPU load, and we end up getting behind. The solution - # currently adopted is as follows: + # This leads to a lot of CPU load, and we end up getting behind. The + # solution currently adopted is to differentiate between receipts and + # destinations we should immediately send to, and those we can trickle + # the receipts to. # - # The first receipt in a given room is sent out immediately, at time T0. Any - # further receipts are, in theory, batched up for N seconds, where N is calculated - # based on the number of servers in the room to achieve a transaction frequency - # of around 50Hz. So, for example, if there were 100 servers in the room, then - # N would be 100 / 50Hz = 2 seconds. + # The current logic is to send receipts out immediately if: + # - the room is "small", i.e. there's only N servers to send receipts + # to, and so sending out the receipts immediately doesn't cause too + # much load; or + # - the receipt is for an event that happened recently, as users + # notice if receipts are delayed when they know other users are + # currently reading the room; or + # - the receipt is being sent to the server that sent the event, so + # that users see receipts for their own receipts quickly. # - # Then, after T+N, we flush out any receipts that have accumulated, and restart - # the timer to flush out more receipts at T+2N, etc. If no receipts accumulate, - # we stop the cycle and go back to the start. + # For destinations that we should delay sending the receipt to, we queue + # the receipts up to be sent in the next transaction, but don't trigger + # a new transaction to be sent. We then add the destination to the + # `DestinationWakeupQueue`, which will slowly iterate over each + # destination and trigger a new transaction to be sent. # - # However, in practice, it is often possible to flush out receipts earlier: in - # particular, if we are sending a transaction to a given server anyway (for - # example, because we have a PDU or a RR in another room to send), then we may - # as well send out all of the pending RRs for that server. So it may be that - # by the time we get to T+N, we don't actually have any RRs left to send out. - # Nevertheless we continue to buffer up RRs for the room in question until we - # reach the point that no RRs arrive between timer ticks. + # However, in practice, it is often possible to send out delayed + # receipts earlier: in particular, if we are sending a transaction to a + # given server anyway (for example, because we have a PDU or a RR in + # another room to send), then we may as well send out all of the pending + # RRs for that server. So it may be that by the time we get to waking up + # the destination, we don't actually have any RRs left to send out. # - # For even more background, see https://github.com/matrix-org/synapse/issues/4730. + # For even more background, see + # https://github.com/matrix-org/synapse/issues/4730. room_id = receipt.room_id + # Local read receipts always have 1 event ID. + event_id = receipt.event_ids[0] + # Work out which remote servers should be poked and poke them. domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id @@ -797,49 +806,51 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None: if not domains: return - queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) + # We now split which domains we want to wake up immediately vs which we + # want to delay waking up. + immediate_domains: StrCollection + delay_domains: StrCollection - # if there is no flush yet scheduled, we will send out these receipts with - # immediate flushes, and schedule the next flush for this room. - if queues_pending_flush is not None: - logger.debug("Queuing receipt for: %r", domains) + if len(domains) < 10: + # For "small" rooms send to all domains immediately + immediate_domains = domains + delay_domains = () else: - logger.debug("Sending receipt to: %r", domains) - self._schedule_rr_flush_for_room(room_id, len(domains)) + metadata = await self.store.get_metadata_for_event( + receipt.room_id, event_id + ) + assert metadata is not None - for domain in domains: - queue = self._get_per_destination_queue(domain) - queue.queue_read_receipt(receipt) + sender_domain = get_domain_from_id(metadata.sender) - # if there is already a RR flush pending for this room, then make sure this - # destination is registered for the flush - if queues_pending_flush is not None: - queues_pending_flush.add(queue) + if self.clock.time_msec() - metadata.received_ts < 60_000: + # We always send receipts for recent messages immediately + immediate_domains = domains + delay_domains = () else: - queue.flush_read_receipts_for_room(room_id) - - def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: - # that is going to cause approximately len(domains) transactions, so now back - # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM - backoff_ms = self._rr_txn_interval_per_room_ms * n_domains - - logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms) - self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) - self._queues_awaiting_rr_flush_by_room[room_id] = set() - - def _flush_rrs_for_room(self, room_id: str) -> None: - queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) - logger.debug("Flushing RRs in %s to %s", room_id, queues) - - if not queues: - # no more RRs arrived for this room; we are done. - return + # Otherwise, we delay waking up all destinations except for the + # sender's domain. + immediate_domains = [] + delay_domains = [] + for domain in domains: + if domain == sender_domain: + immediate_domains.append(domain) + else: + delay_domains.append(domain) + + for domain in immediate_domains: + # Add to destination queue and wake the destination up + queue = self._get_per_destination_queue(domain) + queue.queue_read_receipt(receipt) + queue.attempt_new_transaction() - # schedule the next flush - self._schedule_rr_flush_for_room(room_id, len(queues)) + for domain in delay_domains: + # Add to destination queue... + queue = self._get_per_destination_queue(domain) + queue.queue_read_receipt(receipt) - for queue in queues: - queue.flush_read_receipts_for_room(room_id) + # ... and schedule the destination to be woken up. + self._destination_wakeup_queue.add_to_queue(domain) async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index d097e65ea74..b3f65e8237e 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -156,7 +156,6 @@ def __init__( # Each receipt can only have a single receipt per # (room ID, receipt type, user ID, thread ID) tuple. self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] - self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. # NB: may be a long or an int. @@ -258,15 +257,7 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None: } ) - def flush_read_receipts_for_room(self, room_id: str) -> None: - # If there are any pending receipts for this room then force-flush them - # in a new transaction. - for edu in self._pending_receipt_edus: - if room_id in edu: - self._rrs_pending_flush = True - self.attempt_new_transaction() - # No use in checking remaining EDUs if the room was found. - break + self.mark_new_data() def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -603,12 +594,9 @@ async def _catch_up_transmission_loop(self) -> None: self._destination, last_successful_stream_ordering ) - def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + def _get_receipt_edus(self, limit: int) -> Iterable[Edu]: if not self._pending_receipt_edus: return - if not force_flush and not self._rrs_pending_flush: - # not yet time for this lot - return # Send at most limit EDUs for receipts. for content in self._pending_receipt_edus[:limit]: @@ -747,7 +735,7 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: ) # Add read receipt EDUs. - pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + pending_edus.extend(self.queue._get_receipt_edus(limit=5)) edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) # Next, prioritize to-device messages so that existing encryption channels @@ -795,13 +783,6 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: if not self._pdus and not pending_edus: return [], [] - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if edu_limit: - pending_edus.extend( - self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) - ) - if self._pdus: self._last_stream_ordering = self._pdus[ -1 diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 32c3472e585..707d18de78a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -322,6 +322,7 @@ def _invalidate_caches_for_event( self._attempt_to_invalidate_cache( "get_unread_event_push_actions_by_room_for_user", (room_id,) ) + self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id)) self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,)) @@ -446,6 +447,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None: self._attempt_to_invalidate_cache("_get_state_group_for_event", None) self._attempt_to_invalidate_cache("get_event_ordering", None) + self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,)) self._attempt_to_invalidate_cache("is_partial_state_event", None) self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 403407068c5..825fd00993a 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -193,6 +193,14 @@ class _EventRow: outlier: bool +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EventMetadata: + """Event metadata returned by `get_metadata_for_event(..)`""" + + sender: str + received_ts: int + + class EventRedactBehaviour(Enum): """ What to do when retrieving a redacted event from the database. @@ -2580,3 +2588,22 @@ async def have_finished_sliding_sync_background_jobs(self) -> bool: _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, ) ) + + @cached(tree=True) + async def get_metadata_for_event( + self, room_id: str, event_id: str + ) -> Optional[EventMetadata]: + row = await self.db_pool.simple_select_one( + table="events", + keyvalues={"room_id": room_id, "event_id": event_id}, + retcols=("sender", "received_ts"), + allow_none=True, + desc="get_metadata_for_event", + ) + if row is None: + return None + + return EventMetadata( + sender=row[0], + received_ts=row[1], + ) diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 6a8887fe744..cd906bbbc78 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -34,6 +34,7 @@ from synapse.rest import admin from synapse.rest.client import login from synapse.server import HomeServer +from synapse.storage.databases.main.events_worker import EventMetadata from synapse.types import JsonDict, ReadReceipt from synapse.util import Clock @@ -55,12 +56,15 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: federation_transport_client=self.federation_transport_client, ) - hs.get_storage_controllers().state.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign] + self.main_store = hs.get_datastores().main + self.state_controller = hs.get_storage_controllers().state + + self.state_controller.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign] return_value={"test", "host2"} ) - hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign] - hs.get_storage_controllers().state.get_current_hosts_in_room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign] + self.state_controller.get_current_hosts_in_room ) return hs @@ -185,12 +189,15 @@ def test_send_receipts_thread(self) -> None: ], ) - def test_send_receipts_with_backoff(self) -> None: - """Send two receipts in quick succession; the second should be flushed, but - only after 20ms""" + def test_send_receipts_with_backoff_small_room(self) -> None: + """Read receipt in small rooms should not be delayed""" mock_send_transaction = self.federation_transport_client.send_transaction mock_send_transaction.return_value = {} + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test", "host2"} + ) + sender = self.hs.get_federation_sender() receipt = ReadReceipt( "room_id", @@ -206,47 +213,104 @@ def test_send_receipts_with_backoff(self) -> None: # expect a call to send_transaction mock_send_transaction.assert_called_once() - json_cb = mock_send_transaction.call_args[0][1] - data = json_cb() - self.assertEqual( - data["edus"], - [ - { - "edu_type": EduTypes.RECEIPT, - "content": { - "room_id": { - "m.read": { - "user_id": { - "event_ids": ["event_id"], - "data": {"ts": 1234}, - } - } - } - }, - } - ], + self._assert_edu_in_call(mock_send_transaction.call_args[0][1]) + + def test_send_receipts_with_backoff_recent_event(self) -> None: + """Read receipt for a recent message should not be delayed""" + mock_send_transaction = self.federation_transport_client.send_transaction + mock_send_transaction.return_value = {} + + # Pretend this is a big room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test"} | {f"host{i}" for i in range(20)} ) + + self.main_store.get_metadata_for_event = AsyncMock( + return_value=EventMetadata( + received_ts=self.clock.time_msec(), + sender="@test:test", + ) + ) + + sender = self.hs.get_federation_sender() + receipt = ReadReceipt( + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, + ) + self.get_success(sender.send_read_receipt(receipt)) + + self.pump() + + # expect a call to send_transaction for each host + self.assertEqual(mock_send_transaction.call_count, 20) + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + mock_send_transaction.reset_mock() - # send the second RR + def test_send_receipts_with_backoff_sender(self) -> None: + """Read receipt for a message should not be delayed to the sender, but + is delayed to everyone else""" + mock_send_transaction = self.federation_transport_client.send_transaction + mock_send_transaction.return_value = {} + + # Pretend this is a big room + self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign] + return_value={"test"} | {f"host{i}" for i in range(20)} + ) + + self.main_store.get_metadata_for_event = AsyncMock( + return_value=EventMetadata( + received_ts=self.clock.time_msec() - 5 * 60_000, + sender="@test:host1", + ) + ) + + sender = self.hs.get_federation_sender() receipt = ReadReceipt( "room_id", "m.read", "user_id", - ["other_id"], + ["event_id"], thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) + self.pump() - mock_send_transaction.assert_not_called() - self.reactor.advance(19) - mock_send_transaction.assert_not_called() + # First, expect a call to send_transaction for the sending host + mock_send_transaction.assert_called() - self.reactor.advance(10) - mock_send_transaction.assert_called_once() - json_cb = mock_send_transaction.call_args[0][1] + transaction = mock_send_transaction.call_args_list[0].args[0] + self.assertEqual(transaction.destination, "host1") + self._assert_edu_in_call(mock_send_transaction.call_args_list[0].args[1]) + + # We also expect a call to one of the other hosts, as the first + # destination to wake up. + self.assertEqual(mock_send_transaction.call_count, 2) + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + + mock_send_transaction.reset_mock() + + # We now expect to see 18 more transactions to the remaining hosts + # periodically. + for _ in range(18): + self.reactor.advance( + 1.0 + / self.hs.config.ratelimiting.federation_rr_transactions_per_room_per_second + ) + + mock_send_transaction.assert_called_once() + self._assert_edu_in_call(mock_send_transaction.call_args.args[1]) + mock_send_transaction.reset_mock() + + def _assert_edu_in_call(self, json_cb: Callable[[], JsonDict]) -> None: + """Assert that the given `json_cb` from a `send_transaction` has a + receipt in it.""" data = json_cb() self.assertEqual( data["edus"], @@ -257,7 +321,7 @@ def test_send_receipts_with_backoff(self) -> None: "room_id": { "m.read": { "user_id": { - "event_ids": ["other_id"], + "event_ids": ["event_id"], "data": {"ts": 1234}, } }