-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub improvements #38
Conversation
e2c3afe
to
a1706f7
Compare
a1706f7
to
ab67886
Compare
_ <- Resource.eval(Sync[F].delay { | ||
subscriber.addListener( | ||
errorListener(control.phaser, control.errorRef), | ||
MoreExecutors.directExecutor // TODO: use the non-blocking executor for errors? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the TODO
be removed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes :)
val events = Chunk.iterator(list.iterator.map(_.message)) | ||
val acks = Chunk.iterator(list.iterator.map(_.ackReply)) | ||
val earliestTstamp = list.iterator.map(_.tstamp).min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use one fold
for better performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree these three lines are unsatisfying. It would be nice to find a neat way to traverse the list only once, whilst still building the Chunks in an efficient way, but without compromising on neatness. I haven't found the neat way to do this yet. I'm putting it on my personal to-do list.
): SourceAndAck[F] = new SourceAndAck[F] { | ||
def stream(config: EventProcessingConfig, processor: EventProcessor[F]): Stream[F, Nothing] = { | ||
val str = for { | ||
acksRef <- Stream.bracket(Ref[F].of(Map.empty[Unique.Token, C]))(nackUnhandled(source.checkpointer, _)) | ||
s2 <- source.stream | ||
_ <- Stream.bracket(isConnectedRef.set(true))(_ => isConnectedRef.set(false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is correct. For Kinesis for instance, I think that this would be set to true
way before KCL is actually initialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct about Kinesis. I've searched for a nice way to detect when the Kinesis consumer is healthy, but there is just no way to know.
This works nicely for Kafka and pubsub though.
report lagging if there are unprocessed events $e8 | ||
report healthy if events are processed but not yet acked (e.g. a batch-oriented loader) $e9 | ||
report healthy after all events have been processed and acked $e10 | ||
report disconnected in between two open windows of events $e11 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to follow, isn't it the case that the processing of the next window starts before the processing of the previous one completes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah.... I used the wrong word here, it should not say window
.
The concept of a window
only applies to the higher-level streaming classes, like SourceAndAck
and EventProcessor
. And you are correct, at the higher level we let windows overlap.
At lower level, we deal with a Stream[F, Stream[F, LowLevelEvents[C]]]
but I don't have a good name for the contents of that outer stream. For kafka it represents re-balancing (where kafka consumers pause to share out partitions differently), but for kinesis/pubsub is doesn't represent anything at all.
I will change the sentence to something like
report disconnected while source is in between two active streams of events (e.g. during kafka rebalance)
ab67886
to
bfa2522
Compare
No description provided.