diff --git a/tests/federation/test_federation_out_of_band_membership.py b/tests/federation/test_federation_out_of_band_membership.py index ed78182b6b0..9785792c1a3 100644 --- a/tests/federation/test_federation_out_of_band_membership.py +++ b/tests/federation/test_federation_out_of_band_membership.py @@ -23,11 +23,11 @@ import time import urllib.parse from http import HTTPStatus -from parameterized import parameterized -from typing import Any, Callable, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Optional, Set, Tuple, TypeVar, Union from unittest.mock import Mock import attr +from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor @@ -35,6 +35,9 @@ from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import strip_event +from synapse.federation.federation_base import ( + event_from_pdu_json, +) from synapse.federation.transport.client import SendJoinResponse from synapse.http.matrixfederationclient import ( ByteParser, @@ -116,6 +119,14 @@ class OutOfBandMembershipTests(unittest.FederatingHomeserverTestCase): sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync" + def default_config(self) -> JsonDict: + conf = super().default_config() + # Federation sending is disabled by default in the test environment + # so we need to enable it like this. + conf["federation_sender_instances"] = ["master"] + + return conf + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.federation_http_client = Mock( # spec=MatrixFederationHttpClient @@ -384,6 +395,13 @@ async def put_json( ], ) + if path.startswith("/_matrix/federation/v1/send/") and data is not None: + # Just acknowledge everything hs1 is trying to send hs2 + return { + event_from_pdu_json(pdu, room_version).event_id: {} + for pdu in data.get("pdus", []) + } + raise NotImplementedError( "We have not mocked a response for `put_json(...)` for the following endpoint yet: " + f"{destination}{path} with the following body data: {data}" @@ -440,7 +458,7 @@ def test_can_join_from_out_of_band_invite(self) -> None: [("accept invite", Membership.JOIN), ("reject invite", Membership.LEAVE)] ) def test_can_x_from_out_of_band_invite_after_we_are_already_participating_in_the_room( - self, _test_description, membership_action + self, _test_description: str, membership_action: str ) -> None: """ Test to make sure that we can do either a) join the room (accept the invite) or @@ -462,6 +480,44 @@ def test_can_x_from_out_of_band_invite_after_we_are_already_participating_in_the local_user2_id = self.register_user("user2", "pass") local_user2_tok = self.login(local_user2_id, "pass") + T = TypeVar("T") + + # PDU's that hs1 sent to hs2 + collected_pdus_from_hs1_federation_send: Set[str] = set() + + async def put_json( + destination: str, + path: str, + args: Optional[QueryParams] = None, + data: Optional[JsonDict] = None, + json_data_callback: Optional[Callable[[], JsonDict]] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + backoff_on_404: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + backoff_on_all_error_codes: bool = False, + ) -> Union[JsonDict, T]: + if path.startswith("/_matrix/federation/v1/send/") and data is not None: + logger.info("asdf data: %s", data) + for pdu in data.get("pdus", []): + event = event_from_pdu_json(pdu, room_version) + collected_pdus_from_hs1_federation_send.add(event.event_id) + + # Just acknowledge everything hs1 is trying to send hs2 + return { + event_from_pdu_json(pdu, room_version).event_id: {} + for pdu in data.get("pdus", []) + } + + raise NotImplementedError( + "We have not mocked a response for `put_json(...)` for the following endpoint yet: " + + f"{destination}{path} with the following body data: {data}" + ) + + self.federation_http_client.put_json.side_effect = put_json + # From the remote homeserver, invite user2 on the local homserver user2_invite_membership_event = make_event_from_dict( self.add_hashes_and_signatures_from_other_server( @@ -540,19 +596,57 @@ def test_can_x_from_out_of_band_invite_after_we_are_already_participating_in_the if membership_action == Membership.JOIN: # User2 joins the room - self.helper.join( + join_event = self.helper.join( remote_room_join_result.remote_room_id, local_user2_id, tok=local_user2_tok, ) + expected_pdu_event_id = join_event["event_id"] elif membership_action == Membership.LEAVE: # User2 rejects the invite - self.helper.leave( + leave_event = self.helper.leave( remote_room_join_result.remote_room_id, local_user2_id, tok=local_user2_tok, ) + expected_pdu_event_id = leave_event["event_id"] else: raise NotImplementedError( "This test does not support this membership action yet" ) + + # Sync until the local user2 can see their new membership in the room + with test_timeout( + 3, + "Unable to find user2's new membership event in the room", + ): + while True: + response_body, _ = self.do_sync(sync_body, tok=local_user2_tok) + if membership_action == Membership.JOIN: + if remote_room_id in response_body["rooms"].keys(): + required_state_map = required_state_json_to_state_map( + response_body["rooms"][remote_room_id]["required_state"] + ) + if ( + required_state_map.get((EventTypes.Member, local_user2_id)) + is not None + ): + break + elif membership_action == Membership.LEAVE: + if remote_room_id not in response_body["rooms"].keys(): + break + else: + raise NotImplementedError( + "This test does not support this membership action yet" + ) + + # Prevent tight-looping to allow the `test_timeout` to work + time.sleep(0.1) + + # Make sure that we let hs2 know about the new membership event + self.assertIncludes( + collected_pdus_from_hs1_federation_send, + {expected_pdu_event_id}, + exact=True, + message="Expected to find the event ID of the user2 membership to be sent from hs1 over federation to hs2", + )