Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable limit for the maximum age and number of events in the event store and remove old events before sending (close #660) #661

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import java.util.*
import kotlin.time.DurationUnit
import kotlin.time.toDuration

@RunWith(AndroidJUnit4::class)
class EventStoreTest {
Expand Down Expand Up @@ -166,7 +168,7 @@ class EventStoreTest {
@Throws(InterruptedException::class)
fun testRemoveRangeOfEvents() {
val eventStore = eventStore()
val idList: MutableList<Long?> = ArrayList()
val idList: MutableList<Long> = ArrayList()
idList.add(eventStore.insertEvent(payload()))
idList.add(eventStore.insertEvent(payload()))
idList.add(eventStore.insertEvent(payload()))
Expand Down Expand Up @@ -311,6 +313,60 @@ class EventStoreTest {
Assert.assertEquals(2, eventStore2.size())
}

@Test
fun testRemoveOldEventsByAge() {
val context = InstrumentationRegistry.getInstrumentation().targetContext
val eventStore = SQLiteEventStore(context, "namespace")
openedEventStores.add(eventStore)
waitUntilDatabaseOpen(eventStore)

for (i in 1..5) {
val payload = TrackerPayload()
payload.add("eid", i.toString())
eventStore.insertEvent(payload)
}

Thread.sleep(2000)

for (i in 6..10) {
val payload = TrackerPayload()
payload.add("eid", i.toString())
eventStore.insertEvent(payload)
}

Assert.assertEquals(10, eventStore.size())

eventStore.removeOldEvents(10, 1.toDuration(DurationUnit.SECONDS))

Assert.assertEquals(5, eventStore.size())
val events = eventStore.getEmittableEvents(10)
val eventIds = events.map { it.payload.map["eid"] as String }
Assert.assertEquals(listOf("10", "6", "7", "8", "9"), eventIds.sorted())
}

@Test
fun testRemoveOldestEventsByMaxSize() {
val context = InstrumentationRegistry.getInstrumentation().targetContext
val eventStore = SQLiteEventStore(context, "namespace")
openedEventStores.add(eventStore)
waitUntilDatabaseOpen(eventStore)

for (i in 1..5) {
val trackerPayload = TrackerPayload()
trackerPayload.add("eid", "$i")
eventStore.insertEvent(trackerPayload)
}

Assert.assertEquals(5, eventStore.size())

eventStore.removeOldEvents(3, 10.toDuration(DurationUnit.MINUTES))

Assert.assertEquals(3, eventStore.size())
val events = eventStore.getEmittableEvents(10)
val eventIds = events.map { it.payload.map["eid"] as String }
Assert.assertEquals(listOf("3", "4", "5"), eventIds.sorted())
}

// Helper Methods

@Throws(InterruptedException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent
import com.snowplowanalytics.snowplow.payload.TrackerPayload
import java.util.ArrayList
import java.util.HashMap
import kotlin.time.Duration

class MockEventStore : EventStore {
var db = HashMap<Long, Payload?>()
Expand All @@ -38,7 +39,7 @@ class MockEventStore : EventStore {
}
}

override fun removeEvents(ids: MutableList<Long?>): Boolean {
override fun removeEvents(ids: MutableList<Long>): Boolean {
var result = true
for (id in ids) {
val removed = removeEvent(id!!)
Expand All @@ -60,11 +61,11 @@ class MockEventStore : EventStore {
return db.size.toLong()
}

override fun getEmittableEvents(queryLimit: Int): List<EmitterEvent?> {
override fun getEmittableEvents(queryLimit: Int): List<EmitterEvent> {
synchronized(this) {
val eventIds: MutableList<Long> = ArrayList()
val eventPayloads: MutableList<String> = ArrayList()
var events: MutableList<EmitterEvent?> = ArrayList()
var events: MutableList<EmitterEvent> = ArrayList()
for ((key, value) in db) {
val payloadCopy: Payload = TrackerPayload()
payloadCopy.addMap(value!!.map)
Expand All @@ -81,4 +82,8 @@ class MockEventStore : EventStore {
return events
}
}

override fun removeOldEvents(maxSize: Long, maxAge: Duration) {
// "Not implemented in the mock event store"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.time.Duration

/**
* Build an emitter object which controls the
Expand Down Expand Up @@ -359,6 +360,16 @@ class Emitter(
}
}

/**
* Limit for the maximum number of unsent events to keep in the event store.
*/
var maxEventStoreSize: Long = EmitterDefaults.maxEventStoreSize

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
*/
var maxEventStoreAge: Duration = EmitterDefaults.maxEventStoreAge

/**
* Creates an emitter object
*/
Expand Down Expand Up @@ -413,6 +424,7 @@ class Emitter(
eventStore.add(payload)
if (eventStore.size() >= bufferOption.code && isRunning.compareAndSet(false, true)) {
try {
removeOldEvents()
attemptEmit(networkConnection)
} catch (t: Throwable) {
isRunning.set(false)
Expand All @@ -430,6 +442,7 @@ class Emitter(
Executor.execute(TAG) {
if (isRunning.compareAndSet(false, true)) {
try {
removeOldEvents()
attemptEmit(networkConnection)
} catch (t: Throwable) {
isRunning.set(false)
Expand Down Expand Up @@ -484,6 +497,10 @@ class Emitter(
}
}

private fun removeOldEvents() {
eventStore.removeOldEvents(maxEventStoreSize, maxEventStoreAge)
}

/**
* Attempts to send events in the database to a collector.
*
Expand Down Expand Up @@ -544,7 +561,7 @@ class Emitter(
var successCount = 0
var failedWillRetryCount = 0
var failedWontRetryCount = 0
val removableEvents: MutableList<Long?> = ArrayList()
val removableEvents: MutableList<Long> = ArrayList()

for (res in results) {
if (res.isSuccessful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package com.snowplowanalytics.core.emitter
import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import kotlin.time.Duration

interface EmitterConfigurationInterface {
/**
Expand Down Expand Up @@ -71,4 +72,16 @@ interface EmitterConfigurationInterface {
* If disabled, events that failed to be sent will be dropped regardless of other configuration (such as the customRetryForStatusCodes).
*/
var retryFailedRequests: Boolean

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
* Defaults to 30 days.
*/
var maxEventStoreAge: Duration

/**
* Limit for the maximum number of unsent events to keep in the event store.
* Defaults to 1000.
*/
var maxEventStoreSize: Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.snowplowanalytics.snowplow.controller.EmitterController
import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import kotlin.time.Duration

@RestrictTo(RestrictTo.Scope.LIBRARY)
class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
Expand Down Expand Up @@ -91,6 +92,20 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
emitter.retryFailedRequests = value
}

override var maxEventStoreAge: Duration
get() = emitter.maxEventStoreAge
set(value) {
dirtyConfig.maxEventStoreAge = value
emitter.maxEventStoreAge = value
}

override var maxEventStoreSize: Long
get() = emitter.maxEventStoreSize
set(value) {
dirtyConfig.maxEventStoreSize = value
emitter.maxEventStoreSize = value
}

override val dbCount: Long
get() {
val eventStore = emitter.eventStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import com.snowplowanalytics.snowplow.network.Protocol

import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.time.DurationUnit
import kotlin.time.toDuration

object EmitterDefaults {
var httpMethod = HttpMethod.POST
Expand All @@ -34,4 +36,6 @@ object EmitterDefaults {
var serverAnonymisation = false
var retryFailedRequests = true
var timeUnit = TimeUnit.SECONDS
var maxEventStoreAge = 30.toDuration(DurationUnit.DAYS)
var maxEventStoreSize: Long = 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.emitter.EmitterEvent
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.payload.Payload
import com.snowplowanalytics.snowplow.payload.TrackerPayload
import kotlin.time.Duration

/**
* Helper class for storing, getting and removing
Expand Down Expand Up @@ -135,7 +136,7 @@ class SQLiteEventStore(context: Context, private val namespace: String) : EventS
return retval == 1
}

override fun removeEvents(ids: MutableList<Long?>): Boolean {
override fun removeEvents(ids: MutableList<Long>): Boolean {
if (ids.isEmpty()) {
return false
}
Expand Down Expand Up @@ -163,6 +164,25 @@ class SQLiteEventStore(context: Context, private val namespace: String) : EventS
return retval >= 0
}

override fun removeOldEvents(maxSize: Long, maxAge: Duration) {
if (databaseOpen) {
insertWaitingEventsIfReady()

database?.execSQL(
"""
DELETE FROM ${EventStoreHelper.TABLE_EVENTS}
WHERE ${EventStoreHelper.COLUMN_ID} NOT IN (
SELECT ${EventStoreHelper.COLUMN_ID}
FROM ${EventStoreHelper.TABLE_EVENTS}
WHERE ${EventStoreHelper.COLUMN_DATE_CREATED} >= datetime('now','-${maxAge.inWholeSeconds} seconds')
ORDER BY ${EventStoreHelper.COLUMN_DATE_CREATED} DESC, ${EventStoreHelper.COLUMN_ID} DESC
LIMIT $maxSize
)
""".trimIndent()
)
}
}

/**
* Returns the events that validate a
* specific query.
Expand Down Expand Up @@ -215,12 +235,12 @@ class SQLiteEventStore(context: Context, private val namespace: String) : EventS
}
}

override fun getEmittableEvents(queryLimit: Int): List<EmitterEvent?> {
override fun getEmittableEvents(queryLimit: Int): List<EmitterEvent> {
if (!databaseOpen) {
return emptyList<EmitterEvent>()
}
insertWaitingEventsIfReady()
val events = ArrayList<EmitterEvent?>()
val events = ArrayList<EmitterEvent>()

// FIFO Pattern for sending events
for (eventMetadata in getDescEventsInRange(queryLimit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ class ServiceProvider(
emitter.serverAnonymisation = emitterConfiguration.serverAnonymisation
emitter.requestHeaders = networkConfiguration.requestHeaders
emitter.retryFailedRequests = emitterConfiguration.retryFailedRequests
emitter.maxEventStoreAge = emitterConfiguration.maxEventStoreAge
emitter.maxEventStoreSize = emitterConfiguration.maxEventStoreSize
}

val emitter = Emitter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import com.snowplowanalytics.snowplow.emitter.BufferOption
import com.snowplowanalytics.snowplow.emitter.EventStore
import com.snowplowanalytics.snowplow.network.RequestCallback
import org.json.JSONObject
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* Configure how the tracker should send the events to the collector.
Expand Down Expand Up @@ -92,6 +95,16 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
override var retryFailedRequests: Boolean
get() = _retryFailedRequests ?: sourceConfig?.retryFailedRequests ?: EmitterDefaults.retryFailedRequests
set(value) { _retryFailedRequests = value }

private var _maxEventStoreAge: Duration? = null
override var maxEventStoreAge: Duration
get() = _maxEventStoreAge ?: sourceConfig?.maxEventStoreAge ?: EmitterDefaults.maxEventStoreAge
set(value) { _maxEventStoreAge = value }

private var _maxEventStoreSize: Long? = null
override var maxEventStoreSize: Long
get() = _maxEventStoreSize ?: sourceConfig?.maxEventStoreSize ?: EmitterDefaults.maxEventStoreSize
set(value) { _maxEventStoreSize = value }

// Builders

Expand Down Expand Up @@ -183,6 +196,24 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
return this
}

/**
* Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
* Defaults to 30 days.
*/
fun maxEventStoreAge(maxEventStoreAge: Duration): EmitterConfiguration {
this.maxEventStoreAge = maxEventStoreAge
return this
}

/**
* Limit for the maximum number of unsent events to keep in the event store.
* Defaults to 1000.
*/
fun maxEventStoreSize(maxEventStoreSize: Long): EmitterConfiguration {
this.maxEventStoreSize = maxEventStoreSize
return this
}

// Copyable
override fun copy(): EmitterConfiguration {
return EmitterConfiguration()
Expand All @@ -196,6 +227,8 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
.customRetryForStatusCodes(customRetryForStatusCodes)
.serverAnonymisation(serverAnonymisation)
.retryFailedRequests(retryFailedRequests)
.maxEventStoreSize(maxEventStoreSize)
.maxEventStoreAge(maxEventStoreAge)
}

// JSON Formatter
Expand All @@ -222,5 +255,7 @@ open class EmitterConfiguration() : Configuration, EmitterConfigurationInterface
_customRetryForStatusCodes = customRetryForStatusCodes
}
if (jsonObject.has("retryFailedRequests")) { _retryFailedRequests = jsonObject.getBoolean("retryFailedRequests") }
if (jsonObject.has("maxEventStoreAge")) { _maxEventStoreAge = jsonObject.getDouble("maxEventStoreAge").toDuration(DurationUnit.SECONDS) }
if (jsonObject.has("maxEventStoreSize")) { _maxEventStoreSize = jsonObject.getLong("maxEventStoreSize") }
}
}
Loading
Loading