Skip to content

Commit

Permalink
Add configurable limit for the maximum age and number of events in th…
Browse files Browse the repository at this point in the history
…e event store and remove old events before sending (close #660)

PR #661
  • Loading branch information
matus-tomlein committed Jan 25, 2024
1 parent a472a2e commit e56da18
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import java.util.*
import kotlin.time.DurationUnit
import kotlin.time.toDuration

@RunWith(AndroidJUnit4::class)
class EventStoreTest {
Expand Down Expand Up @@ -311,6 +313,60 @@ class EventStoreTest {
Assert.assertEquals(2, eventStore2.size())
}

@Test
fun testRemoveOldEventsByAge() {
val context = InstrumentationRegistry.getInstrumentation().targetContext
val eventStore = SQLiteEventStore(context, "namespace")
openedEventStores.add(eventStore)
waitUntilDatabaseOpen(eventStore)

for (i in 1..5) {
val payload = TrackerPayload()
payload.add("eid", i.toString())
eventStore.insertEvent(payload)
}

Thread.sleep(2000)

for (i in 6..10) {
val payload = TrackerPayload()
payload.add("eid", i.toString())
eventStore.insertEvent(payload)
}

Assert.assertEquals(10, eventStore.size())

eventStore.removeOldEvents(10, 1.toDuration(DurationUnit.SECONDS))

Assert.assertEquals(5, eventStore.size())
val events = eventStore.getEmittableEvents(10)
val eventIds = events.map { it.payload.map["eid"] as String }
Assert.assertEquals(listOf("10", "6", "7", "8", "9"), eventIds.sorted())
}

@Test
fun testRemoveOldestEventsByMaxSize() {
val context = InstrumentationRegistry.getInstrumentation().targetContext
val eventStore = SQLiteEventStore(context, "namespace")
openedEventStores.add(eventStore)
waitUntilDatabaseOpen(eventStore)

for (i in 1..5) {
val trackerPayload = TrackerPayload()
trackerPayload.add("eid", "$i")
eventStore.insertEvent(trackerPayload)
}

Assert.assertEquals(5, eventStore.size())

eventStore.removeOldEvents(3, 10.toDuration(DurationUnit.MINUTES))

Assert.assertEquals(3, eventStore.size())
val events = eventStore.getEmittableEvents(10)
val eventIds = events.map { it.payload.map["eid"] as String }
Assert.assertEquals(listOf("3", "4", "5"), eventIds.sorted())
}

// Helper Methods

@Throws(InterruptedException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent
import com.snowplowanalytics.snowplow.payload.TrackerPayload
import java.util.ArrayList
import java.util.HashMap
import kotlin.time.Duration

class MockEventStore : EventStore {
var db = HashMap<Long, Payload?>()
Expand Down Expand Up @@ -81,4 +82,8 @@ class MockEventStore : EventStore {
return events
}
}

override fun removeOldEvents(maxSize: Long, maxAge: Duration) {
// "Not implemented in the mock event store"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration

/**
* Build an emitter object which controls the
Expand Down Expand Up @@ -359,6 +360,16 @@ class Emitter(
}
}

/**
* Limit for the maximum number of unsent events to keep in the event store.
*/
var maxEventStoreSize: Long = EmitterDefaults.maxEventStoreSize

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
*/
var maxEventStoreAge: Duration = EmitterDefaults.maxEventStoreAge

/**
* Creates an emitter object
*/
Expand Down Expand Up @@ -413,6 +424,7 @@ class Emitter(
eventStore.add(payload)
if (eventStore.size() >= bufferOption.code && isRunning.compareAndSet(false, true)) {
try {
removeOldEvents()
attemptEmit(networkConnection)
} catch (t: Throwable) {
isRunning.set(false)
Expand All @@ -430,6 +442,7 @@ class Emitter(
Executor.execute(TAG) {
if (isRunning.compareAndSet(false, true)) {
try {
removeOldEvents()
attemptEmit(networkConnection)
} catch (t: Throwable) {
isRunning.set(false)
Expand Down Expand Up @@ -484,6 +497,10 @@ class Emitter(
}
}

private fun removeOldEvents() {
eventStore.removeOldEvents(maxEventStoreSize, maxEventStoreAge)
}

/**
* Attempts to send events in the database to a collector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package com.snowplowanalytics.core.emitter
import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import kotlin.time.Duration

interface EmitterConfigurationInterface {
/**
Expand Down Expand Up @@ -71,4 +72,16 @@ interface EmitterConfigurationInterface {
* If disabled, events that failed to be sent will be dropped regardless of other configuration (such as the customRetryForStatusCodes).
*/
var retryFailedRequests: Boolean

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
* Defaults to 30 days.
*/
var maxEventStoreAge: Duration

/**
* Limit for the maximum number of unsent events to keep in the event store.
* Defaults to 1000.
*/
var maxEventStoreSize: Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.controller.EmitterController
import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import kotlin.time.Duration

@RestrictTo(RestrictTo.Scope.LIBRARY)
class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
Expand Down Expand Up @@ -91,6 +92,20 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
emitter.retryFailedRequests = value
}

override var maxEventStoreAge: Duration
get() = emitter.maxEventStoreAge
set(value) {
dirtyConfig.maxEventStoreAge = value
emitter.maxEventStoreAge = value
}

override var maxEventStoreSize: Long
get() = emitter.maxEventStoreSize
set(value) {
dirtyConfig.maxEventStoreSize = value
emitter.maxEventStoreSize = value
}

override val dbCount: Long
get() {
val eventStore = emitter.eventStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import com.snowplowanalytics.snowplow.network.Protocol

import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.time.DurationUnit
import kotlin.time.toDuration

object EmitterDefaults {
var httpMethod = HttpMethod.POST
Expand All @@ -34,4 +36,6 @@ object EmitterDefaults {
var serverAnonymisation = false
var retryFailedRequests = true
var timeUnit = TimeUnit.SECONDS
var maxEventStoreAge = 30.toDuration(DurationUnit.DAYS)
var maxEventStoreSize: Long = 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.payload.Payload
import com.snowplowanalytics.snowplow.payload.TrackerPayload
import kotlin.time.Duration

/**
* Helper class for storing, getting and removing
Expand Down Expand Up @@ -163,6 +164,25 @@ class SQLiteEventStore(context: Context, private val namespace: String) : EventS
return retval >= 0
}

override fun removeOldEvents(maxSize: Long, maxAge: Duration) {
if (databaseOpen) {
insertWaitingEventsIfReady()

database?.execSQL(
"""
DELETE FROM ${EventStoreHelper.TABLE_EVENTS}
WHERE ${EventStoreHelper.COLUMN_ID} NOT IN (
SELECT ${EventStoreHelper.COLUMN_ID}
FROM ${EventStoreHelper.TABLE_EVENTS}
WHERE ${EventStoreHelper.COLUMN_DATE_CREATED} >= datetime('now','-${maxAge.inWholeSeconds} seconds')
ORDER BY ${EventStoreHelper.COLUMN_DATE_CREATED} DESC, ${EventStoreHelper.COLUMN_ID} DESC
LIMIT $maxSize
)
""".trimIndent()
)
}
}

/**
* Returns the events that validate a
* specific query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class ServiceProvider(
emitter.serverAnonymisation = emitterConfiguration.serverAnonymisation
emitter.requestHeaders = networkConfiguration.requestHeaders
emitter.retryFailedRequests = emitterConfiguration.retryFailedRequests
emitter.maxEventStoreAge = emitterConfiguration.maxEventStoreAge
emitter.maxEventStoreSize = emitterConfiguration.maxEventStoreSize
}

val emitter = Emitter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import org.json.JSONObject
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* Configure how the tracker should send the events to the collector.
Expand Down Expand Up @@ -92,6 +95,16 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
override var retryFailedRequests: Boolean
get() = _retryFailedRequests ?: sourceConfig?.retryFailedRequests ?: EmitterDefaults.retryFailedRequests
set(value) { _retryFailedRequests = value }

private var _maxEventStoreAge: Duration? = null
override var maxEventStoreAge: Duration
get() = _maxEventStoreAge ?: sourceConfig?.maxEventStoreAge ?: EmitterDefaults.maxEventStoreAge
set(value) { _maxEventStoreAge = value }

private var _maxEventStoreSize: Long? = null
override var maxEventStoreSize: Long
get() = _maxEventStoreSize ?: sourceConfig?.maxEventStoreSize ?: EmitterDefaults.maxEventStoreSize
set(value) { _maxEventStoreSize = value }

// Builders

Expand Down Expand Up @@ -183,6 +196,24 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
return this
}

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
* Defaults to 30 days.
*/
fun maxEventStoreAge(maxEventStoreAge: Duration): EmitterConfiguration {
this.maxEventStoreAge = maxEventStoreAge
return this
}

/**
* Limit for the maximum number of unsent events to keep in the event store.
* Defaults to 1000.
*/
fun maxEventStoreSize(maxEventStoreSize: Long): EmitterConfiguration {
this.maxEventStoreSize = maxEventStoreSize
return this
}

// Copyable
override fun copy(): EmitterConfiguration {
return EmitterConfiguration()
Expand All @@ -196,6 +227,8 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
.customRetryForStatusCodes(customRetryForStatusCodes)
.serverAnonymisation(serverAnonymisation)
.retryFailedRequests(retryFailedRequests)
.maxEventStoreSize(maxEventStoreSize)
.maxEventStoreAge(maxEventStoreAge)
}

// JSON Formatter
Expand All @@ -222,5 +255,7 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
_customRetryForStatusCodes = customRetryForStatusCodes
}
if (jsonObject.has("retryFailedRequests")) { _retryFailedRequests = jsonObject.getBoolean("retryFailedRequests") }
if (jsonObject.has("maxEventStoreAge")) { _maxEventStoreAge = jsonObject.getDouble("maxEventStoreAge").toDuration(DurationUnit.SECONDS) }
if (jsonObject.has("maxEventStoreSize")) { _maxEventStoreSize = jsonObject.getLong("maxEventStoreSize") }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.snowplow.emitter

import com.snowplowanalytics.snowplow.payload.Payload
import kotlin.time.Duration

/**
* The component that persists and buffers events before sending.
Expand Down Expand Up @@ -55,4 +56,11 @@ interface EventStore {
* @return EmitterEvent objects containing eventIds and event payloads.
*/
fun getEmittableEvents(queryLimit: Int): List<EmitterEvent>

/**
* Remove events older than `maxAge` seconds and keep only the latest `maxSize` events.
* @param maxSize the maximum number of events to keep.
* @param maxAge the maximum age of events to keep.
*/
fun removeOldEvents(maxSize: Long, maxAge: Duration)
}

0 comments on commit e56da18

Please sign in to comment.