diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt index 7b7210c4..fa96249b 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt @@ -14,6 +14,10 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.Executors import kotlin.math.min import kotlin.math.roundToInt +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.channels.Channel class MetricsRequestFactory : RequestFactory() { override fun upload(apiHost: String): HttpURLConnection { @@ -76,14 +80,14 @@ object Telemetry: Subscriber { var host: String = Constants.DEFAULT_API_HOST // 1.0 is 100%, will get set by Segment setting before start() // Values are adjusted by the sampleRate on send - var sampleRate: Double = 1.0 - var flushTimer: Int = 30 * 1000 // 30s + private var sampleRate = AtomicReference(1.0) + private var flushTimer: Int = 30 * 1000 // 30s var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory()) var sendWriteKeyOnError: Boolean = true var sendErrorLogData: Boolean = false var errorHandler: ((Throwable) -> Unit)? = ::logError - var maxQueueSize: Int = 20 - var errorLogSizeMax: Int = 4000 + private var maxQueueSize: Int = 20 + private var errorLogSizeMax: Int = 4000 private const val MAX_QUEUE_BYTES = 28000 var maxQueueBytes: Int = MAX_QUEUE_BYTES @@ -92,10 +96,10 @@ object Telemetry: Subscriber { } private val queue = ConcurrentLinkedQueue() - private var queueBytes = 0 - private var started = false + private var queueBytes = AtomicInteger(0) + private var started = AtomicBoolean(false) private var rateLimitEndTime: Long = 0 - private var flushFirstError = true + private var flushFirstError = AtomicBoolean(true) private val exceptionHandler = CoroutineExceptionHandler { _, t -> errorHandler?.let { it( Exception( @@ -108,23 +112,34 @@ object Telemetry: Subscriber { private var telemetryDispatcher: ExecutorCoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private var telemetryJob: Job? = null + private val flushChannel = Channel(Channel.UNLIMITED) + + // Start a coroutine to process flush requests + init { + telemetryScope.launch(telemetryDispatcher) { + for (event in flushChannel) { + performFlush() + } + } + } + /** * Starts the telemetry if it is enabled and not already started, and the sample rate is greater than 0. * Called automatically when Telemetry.enable is set to true and when configuration data is received from Segment. */ fun start() { - if (!enable || started || sampleRate == 0.0) return - started = true + if (!enable || started.get() || sampleRate.get() == 0.0) return + started.set(true) // Everything queued was sampled at default 100%, downsample adjustment and send will adjust values - if (Math.random() > sampleRate) { + if (Math.random() > sampleRate.get()) { resetQueue() } telemetryJob = telemetryScope.launch(telemetryDispatcher) { while (isActive) { if (!enable) { - started = false + started.set(false) return@launch } try { @@ -148,7 +163,7 @@ object Telemetry: Subscriber { fun reset() { telemetryJob?.cancel() resetQueue() - started = false + started.set(false) rateLimitEndTime = 0 } @@ -162,10 +177,10 @@ object Telemetry: Subscriber { val tags = mutableMapOf() buildTags(tags) - if (!enable || sampleRate == 0.0) return + if (!enable || sampleRate.get() == 0.0) return if (!metric.startsWith(METRICS_BASE_TAG)) return if (tags.isEmpty()) return - if (Math.random() > sampleRate) return + if (Math.random() > sampleRate.get()) return addRemoteMetric(metric, tags) } @@ -181,10 +196,10 @@ object Telemetry: Subscriber { val tags = mutableMapOf() buildTags(tags) - if (!enable || sampleRate == 0.0) return + if (!enable || sampleRate.get() == 0.0) return if (!metric.startsWith(METRICS_BASE_TAG)) return if (tags.isEmpty()) return - if (Math.random() > sampleRate) return + if (Math.random() > sampleRate.get()) return var filteredTags = if(sendWriteKeyOnError) { tags.toMap() @@ -202,42 +217,47 @@ object Telemetry: Subscriber { addRemoteMetric(metric, filteredTags, log=logData) - if(flushFirstError) { - flushFirstError = false + if(flushFirstError.get()) { + flushFirstError.set(false) flush() } } - @Synchronized fun flush() { + if (!enable) return + flushChannel.trySend(Unit) + } + + private fun performFlush() { if (!enable || queue.isEmpty()) return if (rateLimitEndTime > (System.currentTimeMillis() / 1000).toInt()) { return } rateLimitEndTime = 0 - + flushFirstError.set(false) try { send() - queueBytes = 0 } catch (error: Throwable) { errorHandler?.invoke(error) - sampleRate = 0.0 + sampleRate.set(0.0) } } private fun send() { - if (sampleRate == 0.0) return - var queueCount = queue.size - // Reset queue data size counter since all current queue items will be removed - queueBytes = 0 + if (sampleRate.get() == 0.0) return val sendQueue = mutableListOf() - while (queueCount-- > 0 && !queue.isEmpty()) { + // Reset queue data size counter since all current queue items will be removed + queueBytes.set(0) + var queueCount = queue.size + while(queueCount > 0 && !queue.isEmpty()) { + --queueCount val m = queue.poll() if(m != null) { - m.value = (m.value / sampleRate).roundToInt() + m.value = (m.value / sampleRate.get()).roundToInt() sendQueue.add(m) } } + assert(queue.size == 0) try { // Json.encodeToString by default does not include default values // We're using this to leave off the 'log' parameter if unset. @@ -309,9 +329,13 @@ object Telemetry: Subscriber { tags = fullTags ) val newMetricSize = newMetric.toString().toByteArray().size - if (queueBytes + newMetricSize <= maxQueueBytes) { + // Avoid synchronization issue by adding the size before checking. + if (queueBytes.addAndGet(newMetricSize) <= maxQueueBytes) { queue.add(newMetric) - queueBytes += newMetricSize + } else { + if(queueBytes.addAndGet(-newMetricSize) < 0) { + queueBytes.set(0) + } } } @@ -327,7 +351,7 @@ object Telemetry: Subscriber { private suspend fun systemUpdate(system: com.segment.analytics.kotlin.core.System) { system.settings?.let { settings -> settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let { - sampleRate = it + sampleRate.set(it) // We don't want to start telemetry until two conditions are met: // Telemetry.enable is set to true // Settings from the server have adjusted the sampleRate @@ -339,6 +363,6 @@ object Telemetry: Subscriber { private fun resetQueue() { queue.clear() - queueBytes = 0 + queueBytes.set(0) } } \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt index df1ff354..8c0d39b5 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/TelemetryTest.kt @@ -10,13 +10,16 @@ import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import kotlin.random.Random class TelemetryTest { fun TelemetryResetFlushFirstError() { val field: Field = Telemetry::class.java.getDeclaredField("flushFirstError") field.isAccessible = true - field.set(true, true) + val atomicBoolean = field.get(Telemetry) as AtomicBoolean + atomicBoolean.set(true) } fun TelemetryQueueSize(): Int { val queueField: Field = Telemetry::class.java.getDeclaredField("queue") @@ -29,11 +32,27 @@ class TelemetryTest { queueBytesField.isAccessible = true return queueBytesField.get(Telemetry) as Int } - var TelemetryStarted: Boolean + fun TelemetryMaxQueueSize(): Int { + val maxQueueSizeField: Field = Telemetry::class.java.getDeclaredField("maxQueueSize") + maxQueueSizeField.isAccessible = true + return maxQueueSizeField.get(Telemetry) as Int + } + var TelemetrySampleRate: Double + get() { + val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate") + sampleRateField.isAccessible = true + return (sampleRateField.get(Telemetry) as AtomicReference).get() + } + set(value) { + val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate") + sampleRateField.isAccessible = true + (sampleRateField.get(Telemetry) as AtomicReference).set(value) + } + var TelemetryStarted: AtomicBoolean get() { val startedField: Field = Telemetry::class.java.getDeclaredField("started") startedField.isAccessible = true - return startedField.get(Telemetry) as Boolean + return startedField.get(Telemetry) as AtomicBoolean } set(value) { val startedField: Field = Telemetry::class.java.getDeclaredField("started") @@ -67,7 +86,7 @@ class TelemetryTest { Telemetry.reset() Telemetry.errorHandler = ::errorHandler errors.clear() - Telemetry.sampleRate = 1.0 + TelemetrySampleRate = 1.0 MockKAnnotations.init(this) mockTelemetryHTTPClient() // Telemetry.enable = true <- this will call start(), so don't do it here @@ -75,14 +94,14 @@ class TelemetryTest { @Test fun `Test telemetry start`() { - Telemetry.sampleRate = 0.0 + TelemetrySampleRate = 0.0 Telemetry.enable = true Telemetry.start() - assertEquals(false, TelemetryStarted) + assertEquals(false, TelemetryStarted.get()) - Telemetry.sampleRate = 1.0 + TelemetrySampleRate = 1.0 Telemetry.start() - assertEquals(true, TelemetryStarted) + assertEquals(true, TelemetryStarted.get()) assertEquals(0,errors.size) } @@ -184,11 +203,11 @@ class TelemetryTest { fun `Test increment and error methods when queue is full`() { Telemetry.enable = true Telemetry.start() - for (i in 1..Telemetry.maxQueueSize + 1) { + for (i in 1..TelemetryMaxQueueSize() + 1) { Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i } Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i } } - assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize()) + assertEquals(TelemetryMaxQueueSize(), TelemetryQueueSize()) } @Test @@ -237,6 +256,6 @@ class TelemetryTest { } finally { executor.shutdown() } - assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize) + assertTrue(TelemetryQueueSize() == TelemetryMaxQueueSize()) } }