Skip to content

Commit

Permalink
Amendments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 2, 2025
1 parent 5ace0b7 commit 1416faf
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.ChannelPoolSettings
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings
import com.google.pubsub.v1.{PullRequest, PullResponse}
import com.google.pubsub.v1.{PullRequest, PullResponse, ReceivedMessage}
import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStub}
import org.threeten.bp.{Duration => ThreetenDuration}

Expand Down Expand Up @@ -84,7 +84,6 @@ object PubsubSource {
.fixedRateStartImmediately(config.debounceRequests, dampen = true)
.parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates))
.unNone
.repeat
.prefetchN(parallelPullCount)
.concurrently(extendDeadlines(config, stub, refStates))
.onFinalize(nackRefStatesForShutdown(config, stub, refStates))
Expand All @@ -107,8 +106,6 @@ object PubsubSource {
if (response.getReceivedMessagesCount > 0) {
val records = response.getReceivedMessagesList.asScala.toVector
val chunk = Chunk.from(records.map(_.getMessage.getData.asReadOnlyByteBuffer()))
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
val ackIds = records.map(_.getAckId)
Sync[F].uncancelable { _ =>
for {
Expand All @@ -120,13 +117,19 @@ object PubsubSource {
token <- Unique[F].unique
currentDeadline = timeReceived.plusMillis(config.durationPerAckExtension.toMillis)
_ <- refStates.update(_ + (token -> PubsubBatchState(currentDeadline, ackIds)))
} yield Some(LowLevelEvents(chunk, Vector(token), Some(Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong))))
} yield Some(LowLevelEvents(chunk, Vector(token), Some(earliestTimestampOfRecords(records))))
}
} else {
none.pure[F]
}
}

private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = {
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)
}

/**
* "Nacks" any message that was pulled from pubsub but never consumed by the app.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent
* Name by which to identify Snowplow in the GRPC headers
* @param maxMessagesPerPull
* How many pubsub messages to pull from the server in a single request.
* @param debouceRequests
* @param debounceRequests
* Adds an artifical delay between consecutive requests to pubsub for more messages. Under some
* circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
* the same messages multiple times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ private[pubsub] object Utils {
.setAckDeadlineSeconds(duration.toSeconds.toInt)
.build
val io = for {
_ <- Logger[F].debug(s"Modifying ack deadline for ${ackIds.length} messages by ${duration.toSeconds} seconds")
apiFuture <- Sync[F].delay(stub.modifyAckDeadlineCallable.futureCall(request))
_ <- FutureInterop.fromFuture_(apiFuture)
} yield ()
Expand Down

0 comments on commit 1416faf

Please sign in to comment.