From aebf645cab437862d268fa8e4052f2d02d76fa91 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 12 Dec 2023 16:06:39 -0700 Subject: [PATCH] event stream: send heartbeats to server every 60 seconds It seems like this allows the stream listener to stay open indefinitely. Signed-off-by: Sumner Evans --- linkedin_matrix/user.py | 2 +- linkedin_messaging/linkedin.py | 45 ++++++++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/linkedin_matrix/user.py b/linkedin_matrix/user.py index adbaa22..f930643 100644 --- a/linkedin_matrix/user.py +++ b/linkedin_matrix/user.py @@ -557,7 +557,7 @@ async def _try_listen(self): self.client.add_event_listener("fromEntity", self.handle_linkedin_from_entity) self.listener_event_handlers_created = True try: - await self.client.start_listener() + await self.client.start_listener(self.li_member_urn) except Exception as e: self.log.exception(f"Exception in listener: {e}") self._prev_connected_bridge_state = None diff --git a/linkedin_messaging/linkedin.py b/linkedin_messaging/linkedin.py index f9b6d1d..4901328 100644 --- a/linkedin_messaging/linkedin.py +++ b/linkedin_messaging/linkedin.py @@ -56,6 +56,10 @@ REALTIME_CONNECT_URL = f"{LINKEDIN_BASE_URL}/realtime/connect" VERIFY_URL = f"{LINKEDIN_BASE_URL}/checkpoint/challenge/verify" API_BASE_URL = f"{LINKEDIN_BASE_URL}/voyager/api" +CONNECTIVITY_TRACKING_URL = ( + f"{LINKEDIN_BASE_URL}/realtime/realtimeFrontendClientConnectivityTracking" +) + SEED_URL = f"{LINKEDIN_BASE_URL}/login" """ @@ -108,6 +112,8 @@ class LinkedInMessaging: ], ] + _realtime_sesion_id: str = "" + def __init__(self): self.session = aiohttp.ClientSession() self.event_listeners = defaultdict(list) @@ -524,10 +530,6 @@ async def _listen_to_event_stream(self): "content-type": "text/event-stream", **REQUEST_HEADERS, }, - # The event stream normally stays open for about 3 minutes, but this will - # automatically close it more agressively so that we don't get into a weird - # state where it's not receiving any data, but simultaneously isn't closed. - timeout=120, ) as resp: if resp.status != 200: raise TooManyRequestsError(f"Failed to connect. Status {resp.status}.") @@ -549,6 +551,10 @@ async def _listen_to_event_stream(self): except Exception: logging.exception(f"Handler {handler} failed to handle {data}") + if cc := data.get("com.linkedin.realtimefrontend.ClientConnection", {}): + logging.info(f"Got realtime connection ID: {cc.get('id')}") + self._realtime_sesion_id = cc.get("id") + event_payload = data.get("com.linkedin.realtimefrontend.DecoratedEvent", {}).get( "payload", {} ) @@ -559,9 +565,35 @@ async def _listen_to_event_stream(self): logging.info("Event stream closed") - async def start_listener(self): + async def _send_heartbeat(self, user_urn: URN): + logging.info("Starting heartbeat task") + while True: + await asyncio.sleep(60) + logging.info("Sending heartbeat") + + if not self._realtime_sesion_id: + logging.warning("No realtime session ID. Skipping heartbeat.") + continue + + await self._post( + CONNECTIVITY_TRACKING_URL, + params={"action": "sendHeartbeat"}, + json={ + "isFirstHeartbeat": False, + "isLastHeartbeat": False, + "realtimeSessionId": self._realtime_sesion_id, + "mpName": "voyager-web", + "mpVersion": "1.13.8094", + "clientId": "voyager-web", + "actorUrn": str(user_urn), + "contextUrns": [str(user_urn)], + }, + ) + + async def start_listener(self, user_urn: URN): while True: try: + self._heartbeat_task = asyncio.create_task(self._send_heartbeat(user_urn)) await self._listen_to_event_stream() except asyncio.exceptions.TimeoutError as te: # Special handling for TIMEOUT handler. @@ -574,5 +606,8 @@ async def start_listener(self): except Exception as e: logging.exception(f"Got exception in listener: {e}") raise + finally: + if not self._heartbeat_task.done(): + self._heartbeat_task.cancel() # endregion