Skip to content

Commit

Permalink
Improve robustness of SSE source to server EOFs (#711)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Aug 8, 2024
1 parent 9f18323 commit b2fb1b0
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions crates/arroyo-connectors/src/sse/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use arroyo_state::tables::global_keyed_map::GlobalKeyedView;
use arroyo_types::{string_to_map, ArrowMessage, SignalMessage, UserError, Watermark};
use async_trait::async_trait;
use bincode::{Decode, Encode};
use eventsource_client::{Client, SSE};
use eventsource_client::{Client, Error, SSE};
use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use tokio::select;
use tokio::time::MissedTickBehavior;
use tracing::{debug, info};
Expand Down Expand Up @@ -155,6 +155,8 @@ impl SSESourceFunc {
let mut flush_ticker = tokio::time::interval(Duration::from_millis(50));
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut last_eof = Instant::now();

// since there's no way to partition across an event source, only read on the first task
if ctx.task_info.task_index == 0 {
loop {
Expand Down Expand Up @@ -182,6 +184,16 @@ impl SSESourceFunc {
}
}
}
Some(Err(Error::Eof)) => {
// Many SSE servers will periodically send an EOF; just reconnect
// and continue on unless we immediately get another
if last_eof.elapsed() < Duration::from_secs(5) {
ctx.report_user_error(UserError::new("Error while reading from EventSource",
"Received repeated EOF from EventSource server")).await;
panic!("Error while reading from EventSource: EOF");
}
last_eof = Instant::now();
}
Some(Err(e)) => {
ctx.control_tx.send(
ControlResp::Error {
Expand Down

0 comments on commit b2fb1b0

Please sign in to comment.