From b2fb1b0417bad3354aa44248609b75896a56330c Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Thu, 8 Aug 2024 14:23:46 -0700 Subject: [PATCH] Improve robustness of SSE source to server EOFs (#711) --- crates/arroyo-connectors/src/sse/operator.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/arroyo-connectors/src/sse/operator.rs b/crates/arroyo-connectors/src/sse/operator.rs index 8e8697ae4..ed691f75c 100644 --- a/crates/arroyo-connectors/src/sse/operator.rs +++ b/crates/arroyo-connectors/src/sse/operator.rs @@ -9,10 +9,10 @@ use arroyo_state::tables::global_keyed_map::GlobalKeyedView; use arroyo_types::{string_to_map, ArrowMessage, SignalMessage, UserError, Watermark}; use async_trait::async_trait; use bincode::{Decode, Encode}; -use eventsource_client::{Client, SSE}; +use eventsource_client::{Client, Error, SSE}; use futures::StreamExt; use std::collections::{HashMap, HashSet}; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use tokio::select; use tokio::time::MissedTickBehavior; use tracing::{debug, info}; @@ -155,6 +155,8 @@ impl SSESourceFunc { let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + let mut last_eof = Instant::now(); + // since there's no way to partition across an event source, only read on the first task if ctx.task_info.task_index == 0 { loop { @@ -182,6 +184,16 @@ impl SSESourceFunc { } } } + Some(Err(Error::Eof)) => { + // Many SSE servers will periodically send an EOF; just reconnect + // and continue on unless we immediately get another + if last_eof.elapsed() < Duration::from_secs(5) { + ctx.report_user_error(UserError::new("Error while reading from EventSource", + "Received repeated EOF from EventSource server")).await; + panic!("Error while reading from EventSource: EOF"); + } + last_eof = Instant::now(); + } Some(Err(e)) => { ctx.control_tx.send( ControlResp::Error {