-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Sync: Fix outlier re-persisting causing problems with sliding sync tables #17635
Changes from all commits
0489ba7
a17ae2c
8180229
420efb4
df4e187
40fb3cc
7ee85c5
4f341f1
b7a8dcc
e61ebfb
93e60f4
ae115d3
5b6dc37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -163,24 +163,22 @@ class SlidingSyncMembershipInfo: | |||||
sender: str | ||||||
membership_event_id: str | ||||||
membership: str | ||||||
|
||||||
|
||||||
@attr.s(slots=True, auto_attribs=True) | ||||||
class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo): | ||||||
""" | ||||||
SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership | ||||||
event | ||||||
""" | ||||||
|
||||||
membership_event_stream_ordering: int | ||||||
membership_event_instance_name: str | ||||||
|
||||||
|
||||||
@attr.s(slots=True, auto_attribs=True) | ||||||
class SlidingSyncTableChanges: | ||||||
room_id: str | ||||||
# `stream_ordering` of the most recent event being persisted in the room. This doesn't | ||||||
# need to be perfect, we just need *some* answer that points to a real event in the | ||||||
# room in case we are the first ones inserting into the `sliding_sync_joined_rooms` | ||||||
# table because of the `NON NULL` constraint on `event_stream_ordering`. In reality, | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after | ||||||
# `_update_current_state_txn()` whenever a new event is persisted to update it to the | ||||||
# correct latest value. | ||||||
# | ||||||
# This should be *some* value that points to a real event in the room if we are | ||||||
# still joined to the room and some state is changing (`to_insert` or `to_delete`). | ||||||
joined_room_best_effort_most_recent_stream_ordering: Optional[int] | ||||||
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to | ||||||
# fully-insert it which means we also need to include a `bump_stamp` value to use | ||||||
# for the row. This should only be populated when we're trying to fully-insert a | ||||||
|
@@ -401,6 +399,9 @@ async def _calculate_sliding_sync_table_changes( | |||||
`stream_ordering`). | ||||||
delta_state: Deltas that are going to be used to update the | ||||||
`current_state_events` table. Changes to the current state of the room. | ||||||
|
||||||
Returns: | ||||||
SlidingSyncTableChanges | ||||||
""" | ||||||
to_insert = delta_state.to_insert | ||||||
to_delete = delta_state.to_delete | ||||||
|
@@ -410,7 +411,6 @@ async def _calculate_sliding_sync_table_changes( | |||||
if not to_insert and not to_delete: | ||||||
return SlidingSyncTableChanges( | ||||||
room_id=room_id, | ||||||
joined_room_best_effort_most_recent_stream_ordering=None, | ||||||
joined_room_bump_stamp_to_fully_insert=None, | ||||||
joined_room_updates={}, | ||||||
membership_snapshot_shared_insert_values={}, | ||||||
|
@@ -469,24 +469,24 @@ async def _calculate_sliding_sync_table_changes( | |||||
membership_event_id, | ||||||
user_id, | ||||||
) in membership_event_id_to_user_id_map.items(): | ||||||
# We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point | ||||||
membership_event_stream_ordering = membership_event_map[ | ||||||
membership_event_id | ||||||
].internal_metadata.stream_ordering | ||||||
assert membership_event_stream_ordering is not None | ||||||
membership_event_instance_name = membership_event_map[ | ||||||
membership_event_id | ||||||
].internal_metadata.instance_name | ||||||
assert membership_event_instance_name is not None | ||||||
|
||||||
membership_infos_to_insert_membership_snapshots.append( | ||||||
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here | ||||||
# because we're sourcing the event from `events_and_contexts`, we | ||||||
# can't rely on `stream_ordering`/`instance_name` being correct. We | ||||||
# could be working with events that were previously persisted as an | ||||||
# `outlier` with one `stream_ordering` but are now being persisted | ||||||
# again and de-outliered and assigned a different `stream_ordering` | ||||||
# that won't end up being used. Since we call | ||||||
# `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use | ||||||
# the `stream_ordering` from the first time it was persisted), we're | ||||||
# working with an unreliable `stream_ordering` value that will | ||||||
# possibly be unused and not make it into the `events` table. | ||||||
SlidingSyncMembershipInfo( | ||||||
user_id=user_id, | ||||||
sender=membership_event_map[membership_event_id].sender, | ||||||
membership_event_id=membership_event_id, | ||||||
membership=membership_event_map[membership_event_id].membership, | ||||||
membership_event_stream_ordering=membership_event_stream_ordering, | ||||||
membership_event_instance_name=membership_event_instance_name, | ||||||
) | ||||||
) | ||||||
|
||||||
|
@@ -568,7 +568,6 @@ async def _calculate_sliding_sync_table_changes( | |||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`) | ||||||
# | ||||||
joined_room_updates: SlidingSyncStateInsertValues = {} | ||||||
best_effort_most_recent_stream_ordering: Optional[int] = None | ||||||
bump_stamp_to_fully_insert: Optional[int] = None | ||||||
if not delta_state.no_longer_in_room: | ||||||
current_state_ids_map = {} | ||||||
|
@@ -632,9 +631,7 @@ async def _calculate_sliding_sync_table_changes( | |||||
|
||||||
# Otherwise, we need to find a couple events that we were reset to. | ||||||
if missing_event_ids: | ||||||
remaining_events = await self.store.get_events( | ||||||
current_state_ids_map.values() | ||||||
) | ||||||
remaining_events = await self.store.get_events(missing_event_ids) | ||||||
# There shouldn't be any missing events | ||||||
assert ( | ||||||
remaining_events.keys() == missing_event_ids | ||||||
|
@@ -657,52 +654,9 @@ async def _calculate_sliding_sync_table_changes( | |||||
elif state_key == (EventTypes.Name, ""): | ||||||
joined_room_updates["room_name"] = None | ||||||
|
||||||
# Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to | ||||||
# be perfect, we just need *some* answer that points to a real event in the | ||||||
# room in case we are the first ones inserting into the | ||||||
# `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on | ||||||
# `event_stream_ordering`. In reality, | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after | ||||||
# `_update_current_state_txn()` whenever a new event is persisted to update | ||||||
# it to the correct latest value. | ||||||
# | ||||||
if len(events_and_contexts) > 0: | ||||||
# Since the list is sorted ascending by `stream_ordering`, the last event | ||||||
# should have the highest `stream_ordering`. | ||||||
best_effort_most_recent_stream_ordering = events_and_contexts[-1][ | ||||||
0 | ||||||
].internal_metadata.stream_ordering | ||||||
else: | ||||||
# If there are no `events_and_contexts`, we assume it's one of two scenarios: | ||||||
# 1. If there are new state `to_insert` but no `events_and_contexts`, | ||||||
# then it's a state reset. | ||||||
# 2. Otherwise, it's some partial-state room re-syncing the current state and | ||||||
# going through un-partial process. | ||||||
# | ||||||
# Either way, we assume no new events are being persisted and we can | ||||||
# find the latest already in the database. Since this is a best-effort | ||||||
# value, we don't need to be perfect although I think we're pretty close | ||||||
# here. | ||||||
most_recent_event_pos_results = ( | ||||||
await self.store.get_last_event_pos_in_room( | ||||||
room_id, event_types=None | ||||||
) | ||||||
) | ||||||
assert most_recent_event_pos_results, ( | ||||||
f"We should not be seeing `None` here because we are still in the room ({room_id}) and " | ||||||
+ "it should at-least have a join membership event that's keeping us here." | ||||||
) | ||||||
best_effort_most_recent_stream_ordering = most_recent_event_pos_results[ | ||||||
1 | ||||||
].stream | ||||||
|
||||||
# We should have found a value if we are still in the room | ||||||
assert best_effort_most_recent_stream_ordering is not None | ||||||
|
||||||
return SlidingSyncTableChanges( | ||||||
room_id=room_id, | ||||||
# For `sliding_sync_joined_rooms` | ||||||
joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering, | ||||||
joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert, | ||||||
joined_room_updates=joined_room_updates, | ||||||
# For `sliding_sync_membership_snapshots` | ||||||
|
@@ -1773,31 +1727,53 @@ def _update_current_state_txn( | |||||
# | ||||||
# We only need to update when one of the relevant state values has changed | ||||||
if sliding_sync_table_changes.joined_room_updates: | ||||||
# This should be *some* value that points to a real event in the room if | ||||||
# we are still joined to the room. | ||||||
assert ( | ||||||
sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering | ||||||
is not None | ||||||
sliding_sync_updates_keys = ( | ||||||
sliding_sync_table_changes.joined_room_updates.keys() | ||||||
) | ||||||
sliding_sync_updates_values = ( | ||||||
sliding_sync_table_changes.joined_room_updates.values() | ||||||
) | ||||||
|
||||||
self.db_pool.simple_upsert_txn( | ||||||
txn, | ||||||
table="sliding_sync_joined_rooms", | ||||||
keyvalues={"room_id": room_id}, | ||||||
values=sliding_sync_table_changes.joined_room_updates, | ||||||
insertion_values={ | ||||||
# The reason we're only *inserting* (not *updating*) | ||||||
# `event_stream_ordering` here is because the column has a `NON | ||||||
# NULL` constraint and we need *some* answer. And if the row | ||||||
# already exists, it already has the correct value and it's | ||||||
# better to just rely on | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` | ||||||
# to do the right thing (same for `bump_stamp`). | ||||||
"event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering, | ||||||
# If we're trying to fully-insert a row, we need to provide a | ||||||
# value for `bump_stamp` if it exists for the room. | ||||||
"bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, | ||||||
}, | ||||||
args: List[Any] = [ | ||||||
room_id, | ||||||
room_id, | ||||||
sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, | ||||||
] | ||||||
args.extend(iter(sliding_sync_updates_values)) | ||||||
|
||||||
# XXX: We use a sub-query for `stream_ordering` because it's unreliable to | ||||||
# pre-calculate from `events_and_contexts` at the time when | ||||||
# `_calculate_sliding_sync_table_changes()` is ran. We could be working | ||||||
# with events that were previously persisted as an `outlier` with one | ||||||
# `stream_ordering` but are now being persisted again and de-outliered | ||||||
# and assigned a different `stream_ordering`. Since we call | ||||||
# `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use the | ||||||
# `stream_ordering` from the first time it was persisted), we're working | ||||||
# with an unreliable `stream_ordering` value that will possibly be | ||||||
# unused and not make it into the `events` table. | ||||||
# | ||||||
# We don't update `event_stream_ordering` `ON CONFLICT` because it's | ||||||
# simpler and we can just rely on | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do | ||||||
# the right thing (same for `bump_stamp`). The only reason we're | ||||||
# inserting `event_stream_ordering` here is because the column has a | ||||||
# `NON NULL` constraint and we need some answer. | ||||||
txn.execute( | ||||||
f""" | ||||||
INSERT INTO sliding_sync_joined_rooms | ||||||
(room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)}) | ||||||
VALUES ( | ||||||
?, | ||||||
(SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be simplified to remove
Suggested change
Up to you |
||||||
?, | ||||||
{", ".join("?" for _ in sliding_sync_updates_values)} | ||||||
) | ||||||
ON CONFLICT (room_id) | ||||||
DO UPDATE SET | ||||||
{", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)} | ||||||
""", | ||||||
args, | ||||||
) | ||||||
|
||||||
# We now update `local_current_membership`. We do this regardless | ||||||
|
@@ -1854,38 +1830,63 @@ def _update_current_state_txn( | |||||
if sliding_sync_table_changes.to_insert_membership_snapshots: | ||||||
# Update the `sliding_sync_membership_snapshots` table | ||||||
# | ||||||
# We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys` | ||||||
# because there are other fields in the `ON CONFLICT` upsert to run (see | ||||||
# inherit case above for more context when this happens). | ||||||
self.db_pool.simple_upsert_many_txn( | ||||||
txn=txn, | ||||||
table="sliding_sync_membership_snapshots", | ||||||
key_names=("room_id", "user_id"), | ||||||
key_values=[ | ||||||
(room_id, membership_info.user_id) | ||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots | ||||||
], | ||||||
value_names=[ | ||||||
"sender", | ||||||
"membership_event_id", | ||||||
"membership", | ||||||
"event_stream_ordering", | ||||||
"event_instance_name", | ||||||
] | ||||||
+ list( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() | ||||||
), | ||||||
value_values=[ | ||||||
sliding_sync_snapshot_keys = ( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() | ||||||
) | ||||||
sliding_sync_snapshot_values = ( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() | ||||||
) | ||||||
# We need to insert/update regardless of whether we have | ||||||
# `sliding_sync_snapshot_keys` because there are other fields in the `ON | ||||||
# CONFLICT` upsert to run (see inherit case (explained in | ||||||
# `_calculate_sliding_sync_table_changes()`) for more context when this | ||||||
# happens). | ||||||
# | ||||||
# XXX: We use a sub-query for `stream_ordering` because it's unreliable to | ||||||
# pre-calculate from `events_and_contexts` at the time when | ||||||
# `_calculate_sliding_sync_table_changes()` is ran. We could be working with | ||||||
# events that were previously persisted as an `outlier` with one | ||||||
# `stream_ordering` but are now being persisted again and de-outliered and | ||||||
# assigned a different `stream_ordering` that won't end up being used. Since | ||||||
# we call `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use the | ||||||
# `stream_ordering` from the first time it was persisted), we're working | ||||||
# with an unreliable `stream_ordering` value that will possibly be unused | ||||||
# and not make it into the `events` table. | ||||||
txn.execute_batch( | ||||||
f""" | ||||||
INSERT INTO sliding_sync_membership_snapshots | ||||||
(room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name | ||||||
{("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}) | ||||||
VALUES ( | ||||||
?, ?, ?, ?, ?, | ||||||
(SELECT stream_ordering FROM events WHERE event_id = ?), | ||||||
(SELECT instance_name FROM events WHERE event_id = ?) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have fallen back to
Suggested change
Fixed up in #17636 ⏩ |
||||||
{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} | ||||||
) | ||||||
ON CONFLICT (room_id, user_id) | ||||||
DO UPDATE SET | ||||||
sender = EXCLUDED.sender, | ||||||
membership_event_id = EXCLUDED.membership_event_id, | ||||||
membership = EXCLUDED.membership, | ||||||
event_stream_ordering = EXCLUDED.event_stream_ordering | ||||||
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""} | ||||||
""", | ||||||
[ | ||||||
[ | ||||||
room_id, | ||||||
membership_info.user_id, | ||||||
membership_info.sender, | ||||||
membership_info.membership_event_id, | ||||||
membership_info.membership, | ||||||
membership_info.membership_event_stream_ordering, | ||||||
membership_info.membership_event_instance_name, | ||||||
# XXX: We do not use `membership_info.membership_event_stream_ordering` here | ||||||
# because it is an unreliable value. See XXX note above. | ||||||
membership_info.membership_event_id, | ||||||
# XXX: We do not use `membership_info.membership_event_instance_name` here | ||||||
# because it is an unreliable value. See XXX note above. | ||||||
membership_info.membership_event_id, | ||||||
] | ||||||
+ list( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() | ||||||
) | ||||||
+ list(sliding_sync_snapshot_values) | ||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots | ||||||
], | ||||||
) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found this logic bug because of the new tests. This is a separate problem though that we're fixing.
Now we're actually fetching the
missing_event_ids
if we have any.