From 32e80492e211fe92367f5131d921ddbf5c38857c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matu=CC=81s=CC=8C=20Tomlein?= Date: Thu, 30 Nov 2023 14:07:47 +0100 Subject: [PATCH] Make network requests serially in network connection (close #646) --- .../snowplowtrackerdemojava/Demo.java | 1 - .../snowplowdemokotlin/Demo.kt | 1 - .../snowplow/tracker/EmitterTest.kt | 61 +++++++++++++-- .../snowplow/tracker/MockNetworkConnection.kt | 5 ++ .../snowplowanalytics/core/emitter/Emitter.kt | 77 +++++++++---------- .../core/emitter/EmitterControllerImpl.kt | 4 +- .../core/emitter/EmitterDefaults.kt | 3 +- .../core/tracker/ServiceProvider.kt | 2 +- .../configuration/EmitterConfiguration.kt | 4 +- 9 files changed, 104 insertions(+), 54 deletions(-) diff --git a/snowplow-demo-java/src/main/java/com/snowplowanalytics/snowplowtrackerdemojava/Demo.java b/snowplow-demo-java/src/main/java/com/snowplowanalytics/snowplowtrackerdemojava/Demo.java index 968d8d8f3..345c7d964 100644 --- a/snowplow-demo-java/src/main/java/com/snowplowanalytics/snowplowtrackerdemojava/Demo.java +++ b/snowplow-demo-java/src/main/java/com/snowplowanalytics/snowplowtrackerdemojava/Demo.java @@ -296,7 +296,6 @@ private boolean setupWithLocalConfig() { EmitterConfiguration emitterConfiguration = new EmitterConfiguration() .requestCallback(getRequestCallback()) .threadPoolSize(20) - .emitRange(500) .byteLimitPost(52000); TrackerConfiguration trackerConfiguration = new TrackerConfiguration(appId) .logLevel(LogLevel.VERBOSE) diff --git a/snowplow-demo-kotlin/src/main/java/com/snowplowanalytics/snowplowdemokotlin/Demo.kt b/snowplow-demo-kotlin/src/main/java/com/snowplowanalytics/snowplowdemokotlin/Demo.kt index 5a28d29c9..1df596488 100644 --- a/snowplow-demo-kotlin/src/main/java/com/snowplowanalytics/snowplowdemokotlin/Demo.kt +++ b/snowplow-demo-kotlin/src/main/java/com/snowplowanalytics/snowplowdemokotlin/Demo.kt @@ -262,7 +262,6 @@ class Demo : Activity(), LoggerDelegate { .requestCallback(requestCallback) .bufferOption(BufferOption.SmallGroup) .threadPoolSize(20) - .emitRange(500) .byteLimitPost(52000) val trackerConfiguration = TrackerConfiguration(appId) .logLevel(LogLevel.VERBOSE) diff --git a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/EmitterTest.kt b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/EmitterTest.kt index 3ab0c8c7c..aa9f00959 100755 --- a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/EmitterTest.kt +++ b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/EmitterTest.kt @@ -127,9 +127,9 @@ class EmitterTest { @Test fun testSendLimitSet() { - val builder = { emitter: Emitter -> emitter.sendLimit = 200 } + val builder = { emitter: Emitter -> emitter.emitRange = 200 } val emitter = Emitter(context, "com.acme", builder) - Assert.assertEquals(200, emitter.sendLimit.toLong()) + Assert.assertEquals(200, emitter.emitRange.toLong()) } @Test @@ -156,7 +156,7 @@ class EmitterTest { emitter.requestSecurity = Protocol.HTTP emitter.emitterTick = 250 emitter.emptyLimit = 5 - emitter.sendLimit = 200 + emitter.emitRange = 200 emitter.byteLimitGet = 20000 emitter.byteLimitPost = 25000 emitter.eventStore = MockEventStore() @@ -198,7 +198,7 @@ class EmitterTest { emitter1.requestSecurity = Protocol.HTTP emitter1.emitterTick = 250 emitter1.emptyLimit = 5 - emitter1.sendLimit = 200 + emitter1.emitRange = 200 emitter1.byteLimitGet = 20000 emitter1.byteLimitPost = 25000 emitter1.eventStore = MockEventStore() @@ -452,6 +452,57 @@ class EmitterTest { emitter.flush() } + @Test + fun testNumberOfRequestsMatchesEmitRangeAndOversize() { + val networkConnection = MockNetworkConnection(HttpMethod.POST, 200) + val emitter = getEmitter(networkConnection, BufferOption.Single) + emitter.emitRange = 20 + + emitter.pauseEmit() + for (payload in generatePayloads(20)) { + emitter.add(payload) + } + emitter.resumeEmit() + Thread.sleep(500) + + // made a single request + Assert.assertEquals(1, networkConnection.sendingCount()) + Assert.assertEquals(1, networkConnection.previousResults.first().size) + + networkConnection.clear() + + emitter.pauseEmit() + for (payload in generatePayloads(40)) { + emitter.add(payload) + } + emitter.resumeEmit() + + Thread.sleep(500) + + // made two requests one after the other + Assert.assertEquals(2, networkConnection.sendingCount()) + Assert.assertEquals(1, networkConnection.previousResults.map { it.size }.max()) + + networkConnection.clear() + + // test with oversize requests + emitter.byteLimitPost = 5 + + emitter.pauseEmit() + for (payload in generatePayloads(2)) { + emitter.add(payload) + } + emitter.resumeEmit() + + Thread.sleep(500) + + // made two requests at once + Assert.assertEquals(1, networkConnection.sendingCount()) + Assert.assertEquals(2, networkConnection.previousResults.first().size) + + emitter.flush() + } + // Emitter Builder private fun getEmitter(networkConnection: NetworkConnection?, option: BufferOption?): Emitter { val builder = { emitter: Emitter -> @@ -459,7 +510,7 @@ class EmitterTest { emitter.bufferOption = option!! emitter.emitterTick = 0 emitter.emptyLimit = 0 - emitter.sendLimit = 200 + emitter.emitRange = 200 emitter.byteLimitGet = 20000 emitter.byteLimitPost = 25000 emitter.eventStore = MockEventStore() diff --git a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockNetworkConnection.kt b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockNetworkConnection.kt index f29cc26ba..5f2a41665 100644 --- a/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockNetworkConnection.kt +++ b/snowplow-tracker/src/androidTest/java/com/snowplowanalytics/snowplow/tracker/MockNetworkConnection.kt @@ -59,4 +59,9 @@ class MockNetworkConnection(override var httpMethod: HttpMethod, var statusCode: fun countRequests(): Int { return allRequests.size } + + fun clear() { + previousRequests.clear() + previousResults.clear() + } } diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt index b847b4152..b2264e0c7 100755 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/Emitter.kt @@ -117,7 +117,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni /** * The maximum amount of events to grab for an emit attempt. */ - var sendLimit: Int = EmitterDefaults.sendLimit + var emitRange: Int = EmitterDefaults.emitRange /** * The GET byte limit @@ -556,7 +556,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni } emptyCount = 0 - val events = eventStore.getEmittableEvents(sendLimit) + val events = eventStore.getEmittableEvents(emitRange) val requests = buildRequests(events, networkConnection.httpMethod) val results = networkConnection.sendRequests(requests) @@ -639,47 +639,44 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni } } } else { - var i = 0 - while (i < events.size) { - var reqEventIds: MutableList = ArrayList() - var postPayloadMaps: MutableList = ArrayList() - - var j = i - while (j < i + bufferOption.code && j < events.size) { - val event = events[j] - val payload = event?.payload - val eventId = event?.eventId - if (payload != null && eventId != null) { - addSendingTimeToPayload(payload, sendingTime) - if (isOversize(payload, httpMethod)) { - val request = Request(payload, eventId, true) - requests.add(request) - } else if (isOversize(payload, postPayloadMaps, httpMethod)) { - val request = Request(postPayloadMaps, reqEventIds) - requests.add(request) - - // Clear collections and build a new POST - postPayloadMaps = ArrayList() - reqEventIds = ArrayList() - - // Build and store the request - postPayloadMaps.add(payload) - reqEventIds.add(eventId) - } else { - postPayloadMaps.add(payload) - reqEventIds.add(eventId) - } - j++ - } - - } + var eventIds: MutableList = ArrayList() + var eventPayloads: MutableList = ArrayList() - // Check if all payloads have been processed - if (postPayloadMaps.isNotEmpty()) { - val request = Request(postPayloadMaps, reqEventIds) + for (event in events) { + if (event == null) { continue } + val payload = event.payload + val eventId = event.eventId + addSendingTimeToPayload(payload, sendingTime) + + // Oversize event -> separate requests + if (isOversize(payload, httpMethod)) { + val request = Request(payload, eventId, true) requests.add(request) } - i += bufferOption.code + // Events up to this one are oversize -> create request for them + else if (isOversize(payload, eventPayloads, httpMethod)) { + val request = Request(eventPayloads, eventIds) + requests.add(request) + + // Clear collections and build a new POST + eventPayloads = ArrayList() + eventIds = ArrayList() + + // Build and store the request + eventPayloads.add(payload) + eventIds.add(eventId) + } + // Add to the list of events for the request + else { + eventPayloads.add(payload) + eventIds.add(eventId) + } + } + + // Check if there are any remaining events not in a request + if (eventPayloads.isNotEmpty()) { + val request = Request(eventPayloads, eventIds) + requests.add(request) } } return requests diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt index 30c8312ff..e86025a53 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterControllerImpl.kt @@ -40,10 +40,10 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) : } override var emitRange: Int - get() = emitter.sendLimit + get() = emitter.emitRange set(emitRange) { dirtyConfig.emitRange = emitRange - emitter.sendLimit = emitRange + emitter.emitRange = emitRange } override val threadPoolSize: Int diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt index a31ca5ed4..5eec3de99 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/emitter/EmitterDefaults.kt @@ -24,9 +24,8 @@ object EmitterDefaults { var bufferOption = BufferOption.Single var httpProtocol = Protocol.HTTPS var tlsVersions: EnumSet = EnumSet.of(TLSVersion.TLSv1_2) - var emitRange: Int = 150 + var emitRange: Int = BufferOption.LargeGroup.code var emitterTick = 5 - var sendLimit = 250 var emptyLimit = 5 var byteLimitGet: Long = 40000 var byteLimitPost: Long = 40000 diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt index 00b5a610c..6336948d7 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/core/tracker/ServiceProvider.kt @@ -234,7 +234,7 @@ class ServiceProvider( emitter.client = networkConfiguration.okHttpClient emitter.cookieJar = networkConfiguration.okHttpCookieJar emitter.emitTimeout = networkConfiguration.timeout - emitter.sendLimit = emitterConfiguration.emitRange + emitter.emitRange = emitterConfiguration.emitRange emitter.bufferOption = emitterConfiguration.bufferOption emitter.eventStore = emitterConfiguration.eventStore emitter.byteLimitPost = emitterConfiguration.byteLimitPost diff --git a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt index d8d2113c7..cc15f5f24 100644 --- a/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt +++ b/snowplow-tracker/src/main/java/com/snowplowanalytics/snowplow/configuration/EmitterConfiguration.kt @@ -25,8 +25,8 @@ import org.json.JSONObject * Default values: * - bufferOption: [BufferOption.Single] * - serverAnonymisation: false - * - emitRange: 150 - maximum number of events to process at a time - * - threadPoolSize: 15 + * - emitRange: 25 - maximum number of events to process at a time + * - threadPoolSize: 5 * - byteLimitGet: 40000 bytes * - byteLimitPost: 40000 bytes * - retryFailedRequests: true