Skip to content

Commit

Permalink
Make network requests serially in network connection (close #646)
Browse files Browse the repository at this point in the history
  • Loading branch information
matus-tomlein committed Nov 30, 2023
1 parent 30ef986 commit 2cf8fdf
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ private boolean setupWithLocalConfig() {
EmitterConfiguration emitterConfiguration = new EmitterConfiguration()
.requestCallback(getRequestCallback())
.threadPoolSize(20)
.emitRange(500)
.byteLimitPost(52000);
TrackerConfiguration trackerConfiguration = new TrackerConfiguration(appId)
.logLevel(LogLevel.VERBOSE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ class Demo : Activity(), LoggerDelegate {
.requestCallback(requestCallback)
.bufferOption(BufferOption.SmallGroup)
.threadPoolSize(20)
.emitRange(500)
.byteLimitPost(52000)
val trackerConfiguration = TrackerConfiguration(appId)
.logLevel(LogLevel.VERBOSE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ class EmitterTest {

@Test
fun testSendLimitSet() {
val builder = { emitter: Emitter -> emitter.sendLimit = 200 }
val builder = { emitter: Emitter -> emitter.emitRange = 200 }
val emitter = Emitter(context, "com.acme", builder)
Assert.assertEquals(200, emitter.sendLimit.toLong())
Assert.assertEquals(200, emitter.emitRange.toLong())
}

@Test
Expand All @@ -156,7 +156,7 @@ class EmitterTest {
emitter.requestSecurity = Protocol.HTTP
emitter.emitterTick = 250
emitter.emptyLimit = 5
emitter.sendLimit = 200
emitter.emitRange = 200
emitter.byteLimitGet = 20000
emitter.byteLimitPost = 25000
emitter.eventStore = MockEventStore()
Expand Down Expand Up @@ -198,7 +198,7 @@ class EmitterTest {
emitter1.requestSecurity = Protocol.HTTP
emitter1.emitterTick = 250
emitter1.emptyLimit = 5
emitter1.sendLimit = 200
emitter1.emitRange = 200
emitter1.byteLimitGet = 20000
emitter1.byteLimitPost = 25000
emitter1.eventStore = MockEventStore()
Expand Down Expand Up @@ -452,14 +452,65 @@ class EmitterTest {
emitter.flush()
}

@Test
fun testNumberOfRequestsMatchesEmitRangeAndOversize() {
val networkConnection = MockNetworkConnection(HttpMethod.POST, 200)
val emitter = getEmitter(networkConnection, BufferOption.Single)
emitter.emitRange = 20

emitter.pauseEmit()
for (payload in generatePayloads(20)) {
emitter.add(payload)
}
emitter.resumeEmit()
Thread.sleep(500)

// made a single request
Assert.assertEquals(1, networkConnection.sendingCount())
Assert.assertEquals(1, networkConnection.previousResults.first().size)

networkConnection.clear()

emitter.pauseEmit()
for (payload in generatePayloads(40)) {
emitter.add(payload)
}
emitter.resumeEmit()

Thread.sleep(500)

// made two requests one after the other
Assert.assertEquals(2, networkConnection.sendingCount())
Assert.assertEquals(1, networkConnection.previousResults.map { it.size }.max())

networkConnection.clear()

// test with oversize requests
emitter.byteLimitPost = 5

emitter.pauseEmit()
for (payload in generatePayloads(2)) {
emitter.add(payload)
}
emitter.resumeEmit()

Thread.sleep(500)

// made two requests at once
Assert.assertEquals(1, networkConnection.sendingCount())
Assert.assertEquals(2, networkConnection.previousResults.first().size)

emitter.flush()
}

// Emitter Builder
private fun getEmitter(networkConnection: NetworkConnection?, option: BufferOption?): Emitter {
val builder = { emitter: Emitter ->
emitter.networkConnection = networkConnection
emitter.bufferOption = option!!
emitter.emitterTick = 0
emitter.emptyLimit = 0
emitter.sendLimit = 200
emitter.emitRange = 200
emitter.byteLimitGet = 20000
emitter.byteLimitPost = 25000
emitter.eventStore = MockEventStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ class MockNetworkConnection(override var httpMethod: HttpMethod, var statusCode:
fun countRequests(): Int {
return allRequests.size
}

fun clear() {
previousRequests.clear()
previousResults.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
/**
* The maximum amount of events to grab for an emit attempt.
*/
var sendLimit: Int = EmitterDefaults.sendLimit
var emitRange: Int = EmitterDefaults.emitRange

/**
* The GET byte limit
Expand Down Expand Up @@ -556,7 +556,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
}

emptyCount = 0
val events = eventStore.getEmittableEvents(sendLimit)
val events = eventStore.getEmittableEvents(emitRange)
val requests = buildRequests(events, networkConnection.httpMethod)
val results = networkConnection.sendRequests(requests)

Expand Down Expand Up @@ -639,47 +639,44 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
}
}
} else {
var i = 0
while (i < events.size) {
var reqEventIds: MutableList<Long> = ArrayList()
var postPayloadMaps: MutableList<Payload> = ArrayList()

var j = i
while (j < i + bufferOption.code && j < events.size) {
val event = events[j]
val payload = event?.payload
val eventId = event?.eventId
if (payload != null && eventId != null) {
addSendingTimeToPayload(payload, sendingTime)
if (isOversize(payload, httpMethod)) {
val request = Request(payload, eventId, true)
requests.add(request)
} else if (isOversize(payload, postPayloadMaps, httpMethod)) {
val request = Request(postPayloadMaps, reqEventIds)
requests.add(request)

// Clear collections and build a new POST
postPayloadMaps = ArrayList()
reqEventIds = ArrayList()

// Build and store the request
postPayloadMaps.add(payload)
reqEventIds.add(eventId)
} else {
postPayloadMaps.add(payload)
reqEventIds.add(eventId)
}
j++
}

}
var eventIds: MutableList<Long> = ArrayList()
var eventPayloads: MutableList<Payload> = ArrayList()

// Check if all payloads have been processed
if (postPayloadMaps.isNotEmpty()) {
val request = Request(postPayloadMaps, reqEventIds)
for (event in events) {
if (event == null) { continue }
val payload = event.payload
val eventId = event.eventId
addSendingTimeToPayload(payload, sendingTime)

// Oversize event -> separate requests
if (isOversize(payload, httpMethod)) {
val request = Request(payload, eventId, true)
requests.add(request)
}
i += bufferOption.code
// Events up to this one are oversize -> create request for them
else if (isOversize(payload, eventPayloads, httpMethod)) {
val request = Request(eventPayloads, eventIds)
requests.add(request)

// Clear collections and build a new POST
eventPayloads = ArrayList()
eventIds = ArrayList()

// Build and store the request
eventPayloads.add(payload)
eventIds.add(eventId)
}
// Add to the list of events for the request
else {
eventPayloads.add(payload)
eventIds.add(eventId)
}
}

// Check if there are any remaining events not in a request
if (eventPayloads.isNotEmpty()) {
val request = Request(eventPayloads, eventIds)
requests.add(request)
}
}
return requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
}

override var emitRange: Int
get() = emitter.sendLimit
get() = emitter.emitRange
set(emitRange) {
dirtyConfig.emitRange = emitRange
emitter.sendLimit = emitRange
emitter.emitRange = emitRange
}

override val threadPoolSize: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ object EmitterDefaults {
var bufferOption = BufferOption.Single
var httpProtocol = Protocol.HTTPS
var tlsVersions: EnumSet<TLSVersion> = EnumSet.of(TLSVersion.TLSv1_2)
var emitRange: Int = 150
var emitRange: Int = BufferOption.LargeGroup.code
var emitterTick = 5
var sendLimit = 250
var emptyLimit = 5
var byteLimitGet: Long = 40000
var byteLimitPost: Long = 40000
var emitTimeout = 5
var threadPoolSize = 15
var threadPoolSize = 5
var serverAnonymisation = false
var retryFailedRequests = true
var timeUnit = TimeUnit.SECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class ServiceProvider(
emitter.client = networkConfiguration.okHttpClient
emitter.cookieJar = networkConfiguration.okHttpCookieJar
emitter.emitTimeout = networkConfiguration.timeout
emitter.sendLimit = emitterConfiguration.emitRange
emitter.emitRange = emitterConfiguration.emitRange
emitter.bufferOption = emitterConfiguration.bufferOption
emitter.eventStore = emitterConfiguration.eventStore
emitter.byteLimitPost = emitterConfiguration.byteLimitPost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.json.JSONObject
* Default values:
* - bufferOption: [BufferOption.Single]
* - serverAnonymisation: false
* - emitRange: 150 - maximum number of events to process at a time
* - threadPoolSize: 15
* - emitRange: 25 - maximum number of events to process at a time
* - threadPoolSize: 5
* - byteLimitGet: 40000 bytes
* - byteLimitPost: 40000 bytes
* - retryFailedRequests: true
Expand Down

0 comments on commit 2cf8fdf

Please sign in to comment.