Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing telemetry plugin name #247

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ object Telemetry: Subscriber {

var host: String = Constants.DEFAULT_API_HOST
// 1.0 is 100%, will get set by Segment setting before start()
var sampleRate: Double = 0.1
// Values are adjusted by the sampleRate on send
var sampleRate: Double = 1.0
var flushTimer: Int = 30 * 1000 // 30s
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory())
var sendWriteKeyOnError: Boolean = true
Expand All @@ -96,6 +97,7 @@ object Telemetry: Subscriber {
private val seenErrors = mutableMapOf<String, Int>()
private var started = false
private var rateLimitEndTime: Long = 0
private var flushFirstError = true
private val exceptionHandler = CoroutineExceptionHandler { _, t ->
errorHandler?.let {
it( Exception(
Expand All @@ -116,7 +118,7 @@ object Telemetry: Subscriber {
if (!enable || started || sampleRate == 0.0) return
started = true

// Assume sampleRate is now set and everything in the queue hasn't had it applied
// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values
if (Math.random() > sampleRate) {
resetQueue()
}
Expand Down Expand Up @@ -187,9 +189,13 @@ object Telemetry: Subscriber {
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (queue.size >= maxQueueSize) return
if (Math.random() > sampleRate) return

var filteredTags = tags.toMap()
if (!sendWriteKeyOnError) filteredTags = tags.filterKeys { it.lowercase() != "writekey" }
var filteredTags = if(sendWriteKeyOnError) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like Kotlin's alternative to ternary operators here. Python's makes me a little nauseous - foo = something if condition else something_else aaaaaugh

tags.toMap()
} else {
tags.filterKeys { it.lowercase() != "writekey" }
}
var logData: String? = null
if (sendErrorLogData) {
logData = if (log.length > errorLogSizeMax) {
Expand All @@ -199,23 +205,11 @@ object Telemetry: Subscriber {
}
}

val errorKey = tags["error"]
if (errorKey != null) {
if (seenErrors.containsKey(errorKey)) {
seenErrors[errorKey] = seenErrors[errorKey]!! + 1
if (Math.random() > sampleRate) return
// Adjust how many we've seen after the first since we know for sure.
addRemoteMetric(metric, filteredTags, log=logData,
value = (seenErrors[errorKey]!! * sampleRate).toInt())
seenErrors[errorKey] = 0
} else {
addRemoteMetric(metric, filteredTags, log=logData)
flush()
seenErrors[errorKey] = 0 // Zero because it's already been sent.
}
}
else {
addRemoteMetric(metric, filteredTags, log=logData)
addRemoteMetric(metric, filteredTags, log=logData)

if(flushFirstError) {
flushFirstError = false
flush()
}
}

Expand Down Expand Up @@ -339,12 +333,12 @@ object Telemetry: Subscriber {
system.settings?.let { settings ->
settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let {
sampleRate = it
// We don't want to start telemetry until two conditions are met:
// Telemetry.enable is set to true
// Settings from the server have adjusted the sampleRate
// start is called in both places
start()
}
// We don't want to start telemetry until two conditions are met:
// Telemetry.enable is set to true
// Settings from the server have adjusted the sampleRate
// start is called in both places
start()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
try {
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
it["message"] = "event-${event.type}"
"plugin" to "${plugin.type}-${plugin.javaClass}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
}
when (plugin) {
is DestinationPlugin -> {
Expand All @@ -52,7 +56,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
reportErrorWithMetrics(null, t,"Caught Exception in plugin",
Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) {
it["error"] = t.toString()
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
it["writekey"] = plugin.analytics.configuration.writeKey
it["message"] ="Exception executing plugin"
}
Expand All @@ -72,7 +80,11 @@ internal class Mediator(internal var plugins: CopyOnWriteArrayList<Plugin> = Cop
"Caught Exception applying closure to plugin: $plugin",
Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) {
it["error"] = t.toString()
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
it["writekey"] = plugin.analytics.configuration.writeKey
it["message"] = "Exception executing plugin"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,22 @@ internal class Timeline {
"Caught Exception while setting up plugin $plugin",
Telemetry.INTEGRATION_ERROR_METRIC, t.stackTraceToString()) {
it["error"] = t.toString()
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
it["writekey"] = analytics.configuration.writeKey
it["message"] = "Exception executing plugin"
}
}
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
it["message"] = "added"
it["plugin"] = "${plugin.type.toString()}-${plugin.javaClass.toString()}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
}
plugins[plugin.type]?.add(plugin)
with(analytics) {
Expand Down Expand Up @@ -109,7 +117,11 @@ internal class Timeline {
list.remove(plugin)
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
it["message"] = "removed"
it["plugin"] = "${plugin.type.toString()}-${plugin.javaClass.toString()}"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import java.net.HttpURLConnection
import java.util.concurrent.ConcurrentLinkedQueue

class TelemetryTest {
fun TelemetryResetFlushFirstError() {
val field: Field = Telemetry::class.java.getDeclaredField("flushFirstError")
field.isAccessible = true
field.set(true, true)
}
fun TelemetryQueueSize(): Int {
val queueField: Field = Telemetry::class.java.getDeclaredField("queue")
queueField.isAccessible = true
Expand Down Expand Up @@ -163,6 +168,7 @@ class TelemetryTest {
@Test
fun `Test HTTP Exception`() {
mockTelemetryHTTPClient(shouldThrow = true)
TelemetryResetFlushFirstError()
Telemetry.enable = true
Telemetry.start()
Telemetry.error(Telemetry.INVOKE_METRIC,"log") { it["error"] = "test" }
Expand Down
Loading