Skip to content

Commit

Permalink
event stream: send heartbeats to server every 60 seconds
Browse files Browse the repository at this point in the history
It seems like this allows the stream listener to stay open indefinitely.

Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Dec 12, 2023
1 parent 9996817 commit aebf645
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
2 changes: 1 addition & 1 deletion linkedin_matrix/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ async def _try_listen(self):
self.client.add_event_listener("fromEntity", self.handle_linkedin_from_entity)
self.listener_event_handlers_created = True
try:
await self.client.start_listener()
await self.client.start_listener(self.li_member_urn)
except Exception as e:
self.log.exception(f"Exception in listener: {e}")
self._prev_connected_bridge_state = None
Expand Down
45 changes: 40 additions & 5 deletions linkedin_messaging/linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
REALTIME_CONNECT_URL = f"{LINKEDIN_BASE_URL}/realtime/connect"
VERIFY_URL = f"{LINKEDIN_BASE_URL}/checkpoint/challenge/verify"
API_BASE_URL = f"{LINKEDIN_BASE_URL}/voyager/api"
CONNECTIVITY_TRACKING_URL = (
f"{LINKEDIN_BASE_URL}/realtime/realtimeFrontendClientConnectivityTracking"
)


SEED_URL = f"{LINKEDIN_BASE_URL}/login"
"""
Expand Down Expand Up @@ -108,6 +112,8 @@ class LinkedInMessaging:
],
]

_realtime_sesion_id: str = ""

def __init__(self):
self.session = aiohttp.ClientSession()
self.event_listeners = defaultdict(list)
Expand Down Expand Up @@ -524,10 +530,6 @@ async def _listen_to_event_stream(self):
"content-type": "text/event-stream",
**REQUEST_HEADERS,
},
# The event stream normally stays open for about 3 minutes, but this will
# automatically close it more agressively so that we don't get into a weird
# state where it's not receiving any data, but simultaneously isn't closed.
timeout=120,
) as resp:
if resp.status != 200:
raise TooManyRequestsError(f"Failed to connect. Status {resp.status}.")
Expand All @@ -549,6 +551,10 @@ async def _listen_to_event_stream(self):
except Exception:
logging.exception(f"Handler {handler} failed to handle {data}")

if cc := data.get("com.linkedin.realtimefrontend.ClientConnection", {}):
logging.info(f"Got realtime connection ID: {cc.get('id')}")
self._realtime_sesion_id = cc.get("id")

event_payload = data.get("com.linkedin.realtimefrontend.DecoratedEvent", {}).get(
"payload", {}
)
Expand All @@ -559,9 +565,35 @@ async def _listen_to_event_stream(self):

logging.info("Event stream closed")

async def start_listener(self):
async def _send_heartbeat(self, user_urn: URN):
logging.info("Starting heartbeat task")
while True:
await asyncio.sleep(60)
logging.info("Sending heartbeat")

if not self._realtime_sesion_id:
logging.warning("No realtime session ID. Skipping heartbeat.")
continue

await self._post(
CONNECTIVITY_TRACKING_URL,
params={"action": "sendHeartbeat"},
json={
"isFirstHeartbeat": False,
"isLastHeartbeat": False,
"realtimeSessionId": self._realtime_sesion_id,
"mpName": "voyager-web",
"mpVersion": "1.13.8094",
"clientId": "voyager-web",
"actorUrn": str(user_urn),
"contextUrns": [str(user_urn)],
},
)

async def start_listener(self, user_urn: URN):
while True:
try:
self._heartbeat_task = asyncio.create_task(self._send_heartbeat(user_urn))
await self._listen_to_event_stream()
except asyncio.exceptions.TimeoutError as te:
# Special handling for TIMEOUT handler.
Expand All @@ -574,5 +606,8 @@ async def start_listener(self):
except Exception as e:
logging.exception(f"Got exception in listener: {e}")
raise
finally:
if not self._heartbeat_task.done():
self._heartbeat_task.cancel()

# endregion

0 comments on commit aebf645

Please sign in to comment.