Skip to content

Commit

Permalink
Refactoring unhelpful queue size checks
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelGHSeg committed Nov 25, 2024
1 parent b25c4aa commit 939a09e
Showing 1 changed file with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ object Telemetry: Subscriber {

private val queue = ConcurrentLinkedQueue<RemoteMetric>()
private var queueBytes = 0
private var queueSizeExceeded = false
private val seenErrors = mutableMapOf<String, Int>()
private var started = false
private var rateLimitEndTime: Long = 0
private var flushFirstError = true
Expand Down Expand Up @@ -150,7 +148,6 @@ object Telemetry: Subscriber {
fun reset() {
telemetryJob?.cancel()
resetQueue()
seenErrors.clear()
started = false
rateLimitEndTime = 0
}
Expand All @@ -169,7 +166,6 @@ object Telemetry: Subscriber {
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (Math.random() > sampleRate) return
if (queue.size >= maxQueueSize) return

addRemoteMetric(metric, tags)
}
Expand All @@ -188,7 +184,6 @@ object Telemetry: Subscriber {
if (!enable || sampleRate == 0.0) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (queue.size >= maxQueueSize) return
if (Math.random() > sampleRate) return

var filteredTags = if(sendWriteKeyOnError) {
Expand Down Expand Up @@ -235,7 +230,6 @@ object Telemetry: Subscriber {
var queueCount = queue.size
// Reset queue data size counter since all current queue items will be removed
queueBytes = 0
queueSizeExceeded = false
val sendQueue = mutableListOf<RemoteMetric>()
while (queueCount-- > 0 && !queue.isEmpty()) {
val m = queue.poll()
Expand Down Expand Up @@ -303,6 +297,9 @@ object Telemetry: Subscriber {
found.value += value
return
}
if (queue.size >= maxQueueSize) {
return
}

val newMetric = RemoteMetric(
type = METRIC_TYPE,
Expand All @@ -315,8 +312,6 @@ object Telemetry: Subscriber {
if (queueBytes + newMetricSize <= maxQueueBytes) {
queue.add(newMetric)
queueBytes += newMetricSize
} else {
queueSizeExceeded = true
}
}

Expand Down Expand Up @@ -345,6 +340,5 @@ object Telemetry: Subscriber {
private fun resetQueue() {
queue.clear()
queueBytes = 0
queueSizeExceeded = false
}
}

0 comments on commit 939a09e

Please sign in to comment.