-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Sync: Fix outlier re-persisting causing problems with sliding sync tables #17635
Merged
erikjohnston
merged 13 commits into
develop
from
madlittlemods/fix-outlier-re-persisting-bugs-with-sliding-sync-tables
Aug 30, 2024
Merged
Changes from 8 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
0489ba7
Don't pre-calculate `stream_ordering` for `sliding_sync_joined_rooms`
MadLittleMods a17ae2c
Don't pre-calculate `stream_ordering` for `sliding_sync_membership_sn…
MadLittleMods 8180229
Also do not trust instance_name
MadLittleMods 420efb4
Try clarify which stream_ordering is used
MadLittleMods df4e187
Separate data structures so someone doesn't accidentally use bad data
MadLittleMods 40fb3cc
Fix lints
MadLittleMods 7ee85c5
Add changelog
MadLittleMods 4f341f1
Update comments
MadLittleMods b7a8dcc
Add xxx to match other spots
MadLittleMods e61ebfb
Add test
MadLittleMods 93e60f4
Fix logic bug in finding missing events (separate)
MadLittleMods ae115d3
Merge branch 'develop' into madlittlemods/fix-outlier-re-persisting-b…
MadLittleMods 5b6dc37
Add comment why parameterize
MadLittleMods File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -163,24 +163,22 @@ class SlidingSyncMembershipInfo: | |||||
sender: str | ||||||
membership_event_id: str | ||||||
membership: str | ||||||
|
||||||
|
||||||
@attr.s(slots=True, auto_attribs=True) | ||||||
class SlidingSyncMembershipInfoWithEventPos(SlidingSyncMembershipInfo): | ||||||
""" | ||||||
SlidingSyncMembershipInfo + `stream_ordering`/`instance_name` of the membership | ||||||
event | ||||||
""" | ||||||
|
||||||
membership_event_stream_ordering: int | ||||||
membership_event_instance_name: str | ||||||
|
||||||
|
||||||
@attr.s(slots=True, auto_attribs=True) | ||||||
class SlidingSyncTableChanges: | ||||||
room_id: str | ||||||
# `stream_ordering` of the most recent event being persisted in the room. This doesn't | ||||||
# need to be perfect, we just need *some* answer that points to a real event in the | ||||||
# room in case we are the first ones inserting into the `sliding_sync_joined_rooms` | ||||||
# table because of the `NON NULL` constraint on `event_stream_ordering`. In reality, | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after | ||||||
# `_update_current_state_txn()` whenever a new event is persisted to update it to the | ||||||
# correct latest value. | ||||||
# | ||||||
# This should be *some* value that points to a real event in the room if we are | ||||||
# still joined to the room and some state is changing (`to_insert` or `to_delete`). | ||||||
joined_room_best_effort_most_recent_stream_ordering: Optional[int] | ||||||
# If the row doesn't exist in the `sliding_sync_joined_rooms` table, we need to | ||||||
# fully-insert it which means we also need to include a `bump_stamp` value to use | ||||||
# for the row. This should only be populated when we're trying to fully-insert a | ||||||
|
@@ -401,6 +399,9 @@ async def _calculate_sliding_sync_table_changes( | |||||
`stream_ordering`). | ||||||
delta_state: Deltas that are going to be used to update the | ||||||
`current_state_events` table. Changes to the current state of the room. | ||||||
|
||||||
Returns: | ||||||
SlidingSyncTableChanges | ||||||
""" | ||||||
to_insert = delta_state.to_insert | ||||||
to_delete = delta_state.to_delete | ||||||
|
@@ -410,7 +411,6 @@ async def _calculate_sliding_sync_table_changes( | |||||
if not to_insert and not to_delete: | ||||||
return SlidingSyncTableChanges( | ||||||
room_id=room_id, | ||||||
joined_room_best_effort_most_recent_stream_ordering=None, | ||||||
joined_room_bump_stamp_to_fully_insert=None, | ||||||
joined_room_updates={}, | ||||||
membership_snapshot_shared_insert_values={}, | ||||||
|
@@ -469,24 +469,24 @@ async def _calculate_sliding_sync_table_changes( | |||||
membership_event_id, | ||||||
user_id, | ||||||
) in membership_event_id_to_user_id_map.items(): | ||||||
# We should only be seeing events with `stream_ordering`/`instance_name` assigned by this point | ||||||
membership_event_stream_ordering = membership_event_map[ | ||||||
membership_event_id | ||||||
].internal_metadata.stream_ordering | ||||||
assert membership_event_stream_ordering is not None | ||||||
membership_event_instance_name = membership_event_map[ | ||||||
membership_event_id | ||||||
].internal_metadata.instance_name | ||||||
assert membership_event_instance_name is not None | ||||||
|
||||||
membership_infos_to_insert_membership_snapshots.append( | ||||||
# XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here | ||||||
# because we're sourcing the event from `events_and_contexts`, we | ||||||
# can't rely on `stream_ordering`/`instance_name` being correct. We | ||||||
# could be working with events that were previously persisted as an | ||||||
# `outlier` with one `stream_ordering` but are now being persisted | ||||||
# again and de-outliered and assigned a different `stream_ordering` | ||||||
# that won't end up being used. Since we call | ||||||
# `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use | ||||||
# the `stream_ordering` from the first time it was persisted), we're | ||||||
# working with an unreliable `stream_ordering` value that will | ||||||
# possibly be unused and not make it into the `events` table. | ||||||
SlidingSyncMembershipInfo( | ||||||
user_id=user_id, | ||||||
sender=membership_event_map[membership_event_id].sender, | ||||||
membership_event_id=membership_event_id, | ||||||
membership=membership_event_map[membership_event_id].membership, | ||||||
membership_event_stream_ordering=membership_event_stream_ordering, | ||||||
membership_event_instance_name=membership_event_instance_name, | ||||||
) | ||||||
) | ||||||
|
||||||
|
@@ -568,7 +568,6 @@ async def _calculate_sliding_sync_table_changes( | |||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()`) | ||||||
# | ||||||
joined_room_updates: SlidingSyncStateInsertValues = {} | ||||||
best_effort_most_recent_stream_ordering: Optional[int] = None | ||||||
bump_stamp_to_fully_insert: Optional[int] = None | ||||||
if not delta_state.no_longer_in_room: | ||||||
current_state_ids_map = {} | ||||||
|
@@ -657,52 +656,9 @@ async def _calculate_sliding_sync_table_changes( | |||||
elif state_key == (EventTypes.Name, ""): | ||||||
joined_room_updates["room_name"] = None | ||||||
|
||||||
# Figure out `best_effort_most_recent_stream_ordering`. This doesn't need to | ||||||
# be perfect, we just need *some* answer that points to a real event in the | ||||||
# room in case we are the first ones inserting into the | ||||||
# `sliding_sync_joined_rooms` table because of the `NON NULL` constraint on | ||||||
# `event_stream_ordering`. In reality, | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` is run after | ||||||
# `_update_current_state_txn()` whenever a new event is persisted to update | ||||||
# it to the correct latest value. | ||||||
# | ||||||
if len(events_and_contexts) > 0: | ||||||
# Since the list is sorted ascending by `stream_ordering`, the last event | ||||||
# should have the highest `stream_ordering`. | ||||||
best_effort_most_recent_stream_ordering = events_and_contexts[-1][ | ||||||
0 | ||||||
].internal_metadata.stream_ordering | ||||||
else: | ||||||
# If there are no `events_and_contexts`, we assume it's one of two scenarios: | ||||||
# 1. If there are new state `to_insert` but no `events_and_contexts`, | ||||||
# then it's a state reset. | ||||||
# 2. Otherwise, it's some partial-state room re-syncing the current state and | ||||||
# going through un-partial process. | ||||||
# | ||||||
# Either way, we assume no new events are being persisted and we can | ||||||
# find the latest already in the database. Since this is a best-effort | ||||||
# value, we don't need to be perfect although I think we're pretty close | ||||||
# here. | ||||||
most_recent_event_pos_results = ( | ||||||
await self.store.get_last_event_pos_in_room( | ||||||
room_id, event_types=None | ||||||
) | ||||||
) | ||||||
assert most_recent_event_pos_results, ( | ||||||
f"We should not be seeing `None` here because we are still in the room ({room_id}) and " | ||||||
+ "it should at-least have a join membership event that's keeping us here." | ||||||
) | ||||||
best_effort_most_recent_stream_ordering = most_recent_event_pos_results[ | ||||||
1 | ||||||
].stream | ||||||
|
||||||
# We should have found a value if we are still in the room | ||||||
assert best_effort_most_recent_stream_ordering is not None | ||||||
|
||||||
return SlidingSyncTableChanges( | ||||||
room_id=room_id, | ||||||
# For `sliding_sync_joined_rooms` | ||||||
joined_room_best_effort_most_recent_stream_ordering=best_effort_most_recent_stream_ordering, | ||||||
joined_room_bump_stamp_to_fully_insert=bump_stamp_to_fully_insert, | ||||||
joined_room_updates=joined_room_updates, | ||||||
# For `sliding_sync_membership_snapshots` | ||||||
|
@@ -1773,31 +1729,53 @@ def _update_current_state_txn( | |||||
# | ||||||
# We only need to update when one of the relevant state values has changed | ||||||
if sliding_sync_table_changes.joined_room_updates: | ||||||
# This should be *some* value that points to a real event in the room if | ||||||
# we are still joined to the room. | ||||||
assert ( | ||||||
sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering | ||||||
is not None | ||||||
sliding_sync_updates_keys = ( | ||||||
sliding_sync_table_changes.joined_room_updates.keys() | ||||||
) | ||||||
sliding_sync_updates_values = ( | ||||||
sliding_sync_table_changes.joined_room_updates.values() | ||||||
) | ||||||
|
||||||
self.db_pool.simple_upsert_txn( | ||||||
txn, | ||||||
table="sliding_sync_joined_rooms", | ||||||
keyvalues={"room_id": room_id}, | ||||||
values=sliding_sync_table_changes.joined_room_updates, | ||||||
insertion_values={ | ||||||
# The reason we're only *inserting* (not *updating*) | ||||||
# `event_stream_ordering` here is because the column has a `NON | ||||||
# NULL` constraint and we need *some* answer. And if the row | ||||||
# already exists, it already has the correct value and it's | ||||||
# better to just rely on | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` | ||||||
# to do the right thing (same for `bump_stamp`). | ||||||
"event_stream_ordering": sliding_sync_table_changes.joined_room_best_effort_most_recent_stream_ordering, | ||||||
# If we're trying to fully-insert a row, we need to provide a | ||||||
# value for `bump_stamp` if it exists for the room. | ||||||
"bump_stamp": sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, | ||||||
}, | ||||||
args: List[Any] = [ | ||||||
room_id, | ||||||
room_id, | ||||||
sliding_sync_table_changes.joined_room_bump_stamp_to_fully_insert, | ||||||
] | ||||||
args.extend(iter(sliding_sync_updates_values)) | ||||||
|
||||||
# We use a sub-query for `stream_ordering` because it's unreliable to | ||||||
# pre-calculate from `events_and_contexts` at the time when | ||||||
# `_calculate_sliding_sync_table_changes()` is ran. We could be working | ||||||
# with events that were previously persisted as an `outlier` with one | ||||||
# `stream_ordering` but are now being persisted again and de-outliered | ||||||
# and assigned a different `stream_ordering`. Since we call | ||||||
# `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use the | ||||||
# `stream_ordering` from the first time it was persisted), we're working | ||||||
# with an unreliable `stream_ordering` value that will possibly be | ||||||
# unused and not make it into the `events` table. | ||||||
# | ||||||
# We don't update `event_stream_ordering` `ON CONFLICT` because it's | ||||||
# simpler and we can just rely on | ||||||
# `_update_sliding_sync_tables_with_new_persisted_events_txn()` to do | ||||||
# the right thing (same for `bump_stamp`). The only reason we're | ||||||
# inserting `event_stream_ordering` here is because the column has a | ||||||
# `NON NULL` constraint and we need some answer. | ||||||
txn.execute( | ||||||
f""" | ||||||
INSERT INTO sliding_sync_joined_rooms | ||||||
(room_id, event_stream_ordering, bump_stamp, {", ".join(sliding_sync_updates_keys)}) | ||||||
VALUES ( | ||||||
?, | ||||||
(SELECT stream_ordering FROM events WHERE room_id = ? ORDER BY stream_ordering DESC LIMIT 1), | ||||||
?, | ||||||
{", ".join("?" for _ in sliding_sync_updates_values)} | ||||||
) | ||||||
ON CONFLICT (room_id) | ||||||
DO UPDATE SET | ||||||
{", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_updates_keys)} | ||||||
""", | ||||||
args, | ||||||
) | ||||||
|
||||||
# We now update `local_current_membership`. We do this regardless | ||||||
|
@@ -1854,38 +1832,63 @@ def _update_current_state_txn( | |||||
if sliding_sync_table_changes.to_insert_membership_snapshots: | ||||||
# Update the `sliding_sync_membership_snapshots` table | ||||||
# | ||||||
# We need to insert/update regardless of whether we have `sliding_sync_snapshot_keys` | ||||||
# because there are other fields in the `ON CONFLICT` upsert to run (see | ||||||
# inherit case above for more context when this happens). | ||||||
self.db_pool.simple_upsert_many_txn( | ||||||
txn=txn, | ||||||
table="sliding_sync_membership_snapshots", | ||||||
key_names=("room_id", "user_id"), | ||||||
key_values=[ | ||||||
(room_id, membership_info.user_id) | ||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots | ||||||
], | ||||||
value_names=[ | ||||||
"sender", | ||||||
"membership_event_id", | ||||||
"membership", | ||||||
"event_stream_ordering", | ||||||
"event_instance_name", | ||||||
] | ||||||
+ list( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() | ||||||
), | ||||||
value_values=[ | ||||||
sliding_sync_snapshot_keys = ( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.keys() | ||||||
) | ||||||
sliding_sync_snapshot_values = ( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() | ||||||
) | ||||||
# We need to insert/update regardless of whether we have | ||||||
# `sliding_sync_snapshot_keys` because there are other fields in the `ON | ||||||
# CONFLICT` upsert to run (see inherit case (explained in | ||||||
# `_calculate_sliding_sync_table_changes()`) for more context when this | ||||||
# happens). | ||||||
# | ||||||
# XXX: We use a sub-query for `stream_ordering` because it's unreliable to | ||||||
# pre-calculate from `events_and_contexts` at the time when | ||||||
# `_calculate_sliding_sync_table_changes()` is ran. We could be working with | ||||||
# events that were previously persisted as an `outlier` with one | ||||||
# `stream_ordering` but are now being persisted again and de-outliered and | ||||||
# assigned a different `stream_ordering` that won't end up being used. Since | ||||||
# we call `_calculate_sliding_sync_table_changes()` before | ||||||
# `_update_outliers_txn()` which fixes this discrepancy (always use the | ||||||
# `stream_ordering` from the first time it was persisted), we're working | ||||||
# with an unreliable `stream_ordering` value that will possibly be unused | ||||||
# and not make it into the `events` table. | ||||||
txn.execute_batch( | ||||||
f""" | ||||||
INSERT INTO sliding_sync_membership_snapshots | ||||||
(room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name | ||||||
{("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}) | ||||||
VALUES ( | ||||||
?, ?, ?, ?, ?, | ||||||
(SELECT stream_ordering FROM events WHERE event_id = ?), | ||||||
(SELECT instance_name FROM events WHERE event_id = ?) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have fallen back to
Suggested change
Fixed up in #17636 ⏩ |
||||||
{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""} | ||||||
) | ||||||
ON CONFLICT (room_id, user_id) | ||||||
DO UPDATE SET | ||||||
sender = EXCLUDED.sender, | ||||||
membership_event_id = EXCLUDED.membership_event_id, | ||||||
membership = EXCLUDED.membership, | ||||||
event_stream_ordering = EXCLUDED.event_stream_ordering | ||||||
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""} | ||||||
""", | ||||||
[ | ||||||
[ | ||||||
room_id, | ||||||
membership_info.user_id, | ||||||
membership_info.sender, | ||||||
membership_info.membership_event_id, | ||||||
membership_info.membership, | ||||||
membership_info.membership_event_stream_ordering, | ||||||
membership_info.membership_event_instance_name, | ||||||
# XXX: We do not use `membership_info.membership_event_stream_ordering` here | ||||||
# because it is an unreliable value. See XXX note above. | ||||||
membership_info.membership_event_id, | ||||||
# XXX: We do not use `membership_info.membership_event_instance_name` here | ||||||
# because it is an unreliable value. See XXX note above. | ||||||
membership_info.membership_event_id, | ||||||
] | ||||||
+ list( | ||||||
sliding_sync_table_changes.membership_snapshot_shared_insert_values.values() | ||||||
) | ||||||
+ list(sliding_sync_snapshot_values) | ||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots | ||||||
], | ||||||
) | ||||||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified to remove
ORDER BY stream_ordering DESC
. This just gives a more correct answer but we will end up with the correct answer anyway when_update_sliding_sync_tables_with_new_persisted_events_txn()
runs right after this.Up to you