diff --git a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/internal/emitter/storage/EventStoreTest.kt b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/internal/emitter/storage/EventStoreTest.kt index 500b3e4b2..27c649cf4 100755 --- a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/internal/emitter/storage/EventStoreTest.kt +++ b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/internal/emitter/storage/EventStoreTest.kt @@ -27,6 +27,8 @@ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import java.util.* +import kotlin.time.DurationUnit +import kotlin.time.toDuration @RunWith(AndroidJUnit4::class) class EventStoreTest { @@ -311,6 +313,60 @@ class EventStoreTest { Assert.assertEquals(2, eventStore2.size()) } + @Test + fun testRemoveOldEventsByAge() { + val context = InstrumentationRegistry.getInstrumentation().targetContext + val eventStore = SQLiteEventStore(context, "namespace") + openedEventStores.add(eventStore) + waitUntilDatabaseOpen(eventStore) + + for (i in 1..5) { + val payload = TrackerPayload() + payload.add("eid", i.toString()) + eventStore.insertEvent(payload) + } + + Thread.sleep(2000) + + for (i in 6..10) { + val payload = TrackerPayload() + payload.add("eid", i.toString()) + eventStore.insertEvent(payload) + } + + Assert.assertEquals(10, eventStore.size()) + + eventStore.removeOldEvents(10, 1.toDuration(DurationUnit.SECONDS)) + + Assert.assertEquals(5, eventStore.size()) + val events = eventStore.getEmittableEvents(10) + val eventIds = events.map { it.payload.map["eid"] as String } + Assert.assertEquals(listOf("10", "6", "7", "8", "9"), eventIds.sorted()) + } + + @Test + fun testRemoveOldestEventsByMaxSize() { + val context = InstrumentationRegistry.getInstrumentation().targetContext + val eventStore = SQLiteEventStore(context, "namespace") + openedEventStores.add(eventStore) + waitUntilDatabaseOpen(eventStore) + + for (i in 1..5) { + val trackerPayload = TrackerPayload() + trackerPayload.add("eid", "$i") + eventStore.insertEvent(trackerPayload) + } + + Assert.assertEquals(5, eventStore.size()) + + eventStore.removeOldEvents(3, 10.toDuration(DurationUnit.MINUTES)) + + Assert.assertEquals(3, eventStore.size()) + val events = eventStore.getEmittableEvents(10) + val eventIds = events.map { it.payload.map["eid"] as String } + Assert.assertEquals(listOf("3", "4", "5"), eventIds.sorted()) + } + // Helper Methods @Throws(InterruptedException::class) diff --git a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockEventStore.kt b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockEventStore.kt index d55255a82..07b1a1310 100644 --- a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockEventStore.kt +++ b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockEventStore.kt @@ -19,6 +19,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent import com.snowplowanalytics.snowplow.payload.TrackerPayload import java.util.ArrayList import java.util.HashMap +import kotlin.time.Duration class MockEventStore : EventStore { var db = HashMap() @@ -81,4 +82,8 @@ class MockEventStore : EventStore { return events } } + + override fun removeOldEvents(maxSize: Long, maxAge: Duration) { + // "Not implemented in the mock event store" + } } diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt index fe54bc4eb..849eb50b5 100755 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt @@ -33,6 +33,7 @@ import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import kotlin.time.Duration /** * Build an emitter object which controls the @@ -359,6 +360,16 @@ class Emitter( } } + /** + * Limit for the maximum number of unsent events to keep in the event store. + */ + var maxEventStoreSize: Long = EmitterDefaults.maxEventStoreSize + + /** + * Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent. + */ + var maxEventStoreAge: Duration = EmitterDefaults.maxEventStoreAge + /** * Creates an emitter object */ @@ -413,6 +424,7 @@ class Emitter( eventStore.add(payload) if (eventStore.size() >= bufferOption.code && isRunning.compareAndSet(false, true)) { try { + removeOldEvents() attemptEmit(networkConnection) } catch (t: Throwable) { isRunning.set(false) @@ -430,6 +442,7 @@ class Emitter( Executor.execute(TAG) { if (isRunning.compareAndSet(false, true)) { try { + removeOldEvents() attemptEmit(networkConnection) } catch (t: Throwable) { isRunning.set(false) @@ -484,6 +497,10 @@ class Emitter( } } + private fun removeOldEvents() { + eventStore.removeOldEvents(maxEventStoreSize, maxEventStoreAge) + } + /** * Attempts to send events in the database to a collector. * diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterConfigurationInterface.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterConfigurationInterface.kt index defabd276..bae69d0b6 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterConfigurationInterface.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterConfigurationInterface.kt @@ -15,6 +15,7 @@ package com.snowplowanalytics.core.emitter import com.snowplowanalytics.snowplow.emitter.BufferOption import com.snowplowanalytics.snowplow.emitter.EventStore import com.snowplowanalytics.snowplow.network.RequestCallback +import kotlin.time.Duration interface EmitterConfigurationInterface { /** @@ -71,4 +72,16 @@ interface EmitterConfigurationInterface { * If disabled, events that failed to be sent will be dropped regardless of other configuration (such as the customRetryForStatusCodes). */ var retryFailedRequests: Boolean + + /** + * Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent. + * Defaults to 30 days. + */ + var maxEventStoreAge: Duration + + /** + * Limit for the maximum number of unsent events to keep in the event store. + * Defaults to 1000. + */ + var maxEventStoreSize: Long } diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt index e86025a53..03e51374e 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.controller.EmitterController import com.snowplowanalytics.snowplow.emitter.BufferOption import com.snowplowanalytics.snowplow.emitter.EventStore import com.snowplowanalytics.snowplow.network.RequestCallback +import kotlin.time.Duration @RestrictTo(RestrictTo.Scope.LIBRARY) class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) : @@ -91,6 +92,20 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) : emitter.retryFailedRequests = value } + override var maxEventStoreAge: Duration + get() = emitter.maxEventStoreAge + set(value) { + dirtyConfig.maxEventStoreAge = value + emitter.maxEventStoreAge = value + } + + override var maxEventStoreSize: Long + get() = emitter.maxEventStoreSize + set(value) { + dirtyConfig.maxEventStoreSize = value + emitter.maxEventStoreSize = value + } + override val dbCount: Long get() { val eventStore = emitter.eventStore diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt index 04283f0a5..8a5c96b58 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt @@ -18,6 +18,8 @@ import com.snowplowanalytics.snowplow.network.Protocol import java.util.* import java.util.concurrent.TimeUnit +import kotlin.time.DurationUnit +import kotlin.time.toDuration object EmitterDefaults { var httpMethod = HttpMethod.POST @@ -34,4 +36,6 @@ object EmitterDefaults { var serverAnonymisation = false var retryFailedRequests = true var timeUnit = TimeUnit.SECONDS + var maxEventStoreAge = 30.toDuration(DurationUnit.DAYS) + var maxEventStoreSize: Long = 1000 } diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/storage/SQLiteEventStore.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/storage/SQLiteEventStore.kt index 6266cedad..a2fc02a4b 100755 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/storage/SQLiteEventStore.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/storage/SQLiteEventStore.kt @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent import com.snowplowanalytics.snowplow.emitter.EventStore import com.snowplowanalytics.snowplow.payload.Payload import com.snowplowanalytics.snowplow.payload.TrackerPayload +import kotlin.time.Duration /** * Helper class for storing, getting and removing @@ -163,6 +164,25 @@ class SQLiteEventStore(context: Context, private val namespace: String) : EventS return retval >= 0 } + override fun removeOldEvents(maxSize: Long, maxAge: Duration) { + if (databaseOpen) { + insertWaitingEventsIfReady() + + database?.execSQL( + """ + DELETE FROM ${EventStoreHelper.TABLE_EVENTS} + WHERE ${EventStoreHelper.COLUMN_ID} NOT IN ( + SELECT ${EventStoreHelper.COLUMN_ID} + FROM ${EventStoreHelper.TABLE_EVENTS} + WHERE ${EventStoreHelper.COLUMN_DATE_CREATED} >= datetime('now','-${maxAge.inWholeSeconds} seconds') + ORDER BY ${EventStoreHelper.COLUMN_DATE_CREATED} DESC, ${EventStoreHelper.COLUMN_ID} DESC + LIMIT $maxSize + ) + """.trimIndent() + ) + } + } + /** * Returns the events that validate a * specific query. diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt index a8fbaf374..1822196d5 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt @@ -244,6 +244,8 @@ class ServiceProvider( emitter.serverAnonymisation = emitterConfiguration.serverAnonymisation emitter.requestHeaders = networkConfiguration.requestHeaders emitter.retryFailedRequests = emitterConfiguration.retryFailedRequests + emitter.maxEventStoreAge = emitterConfiguration.maxEventStoreAge + emitter.maxEventStoreSize = emitterConfiguration.maxEventStoreSize } val emitter = Emitter( diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt index 8fba66395..5578a30ee 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt @@ -18,6 +18,9 @@ import com.snowplowanalytics.snowplow.emitter.BufferOption import com.snowplowanalytics.snowplow.emitter.EventStore import com.snowplowanalytics.snowplow.network.RequestCallback import org.json.JSONObject +import kotlin.time.Duration +import kotlin.time.DurationUnit +import kotlin.time.toDuration /** * Configure how the tracker should send the events to the collector. @@ -92,6 +95,16 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface override var retryFailedRequests: Boolean get() = _retryFailedRequests ?: sourceConfig?.retryFailedRequests ?: EmitterDefaults.retryFailedRequests set(value) { _retryFailedRequests = value } + + private var _maxEventStoreAge: Duration? = null + override var maxEventStoreAge: Duration + get() = _maxEventStoreAge ?: sourceConfig?.maxEventStoreAge ?: EmitterDefaults.maxEventStoreAge + set(value) { _maxEventStoreAge = value } + + private var _maxEventStoreSize: Long? = null + override var maxEventStoreSize: Long + get() = _maxEventStoreSize ?: sourceConfig?.maxEventStoreSize ?: EmitterDefaults.maxEventStoreSize + set(value) { _maxEventStoreSize = value } // Builders @@ -183,6 +196,24 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface return this } + /** + * Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent. + * Defaults to 30 days. + */ + fun maxEventStoreAge(maxEventStoreAge: Duration): EmitterConfiguration { + this.maxEventStoreAge = maxEventStoreAge + return this + } + + /** + * Limit for the maximum number of unsent events to keep in the event store. + * Defaults to 1000. + */ + fun maxEventStoreSize(maxEventStoreSize: Long): EmitterConfiguration { + this.maxEventStoreSize = maxEventStoreSize + return this + } + // Copyable override fun copy(): EmitterConfiguration { return EmitterConfiguration() @@ -196,6 +227,8 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface .customRetryForStatusCodes(customRetryForStatusCodes) .serverAnonymisation(serverAnonymisation) .retryFailedRequests(retryFailedRequests) + .maxEventStoreSize(maxEventStoreSize) + .maxEventStoreAge(maxEventStoreAge) } // JSON Formatter @@ -222,5 +255,7 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface _customRetryForStatusCodes = customRetryForStatusCodes } if (jsonObject.has("retryFailedRequests")) { _retryFailedRequests = jsonObject.getBoolean("retryFailedRequests") } + if (jsonObject.has("maxEventStoreAge")) { _maxEventStoreAge = jsonObject.getDouble("maxEventStoreAge").toDuration(DurationUnit.SECONDS) } + if (jsonObject.has("maxEventStoreSize")) { _maxEventStoreSize = jsonObject.getLong("maxEventStoreSize") } } } diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/emitter/EventStore.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/emitter/EventStore.kt index b7a1e5acd..aa4950d4a 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/emitter/EventStore.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/emitter/EventStore.kt @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.emitter import com.snowplowanalytics.snowplow.payload.Payload +import kotlin.time.Duration /** * The component that persists and buffers events before sending. @@ -55,4 +56,11 @@ interface EventStore { * @return EmitterEvent objects containing eventIds and event payloads. */ fun getEmittableEvents(queryLimit: Int): List + + /** + * Remove events older than `maxAge` seconds and keep only the latest `maxSize` events. + * @param maxSize the maximum number of events to keep. + * @param maxAge the maximum age of events to keep. + */ + fun removeOldEvents(maxSize: Long, maxAge: Duration) }