Skip to content

Commit

Permalink
Make network requests serially in network connection (close #646)
Browse files Browse the repository at this point in the history
  • Loading branch information
matus-tomlein committed Nov 30, 2023
1 parent 30ef986 commit ad8f1b8
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ private boolean setupWithLocalConfig() {
EmitterConfiguration emitterConfiguration = new EmitterConfiguration()
.requestCallback(getRequestCallback())
.threadPoolSize(20)
.emitRange(500)
.byteLimitPost(52000);
TrackerConfiguration trackerConfiguration = new TrackerConfiguration(appId)
.logLevel(LogLevel.VERBOSE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ class Demo : Activity(), LoggerDelegate {
.requestCallback(requestCallback)
.bufferOption(BufferOption.SmallGroup)
.threadPoolSize(20)
.emitRange(500)
.byteLimitPost(52000)
val trackerConfiguration = TrackerConfiguration(appId)
.logLevel(LogLevel.VERBOSE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ class EmitterTest {

@Test
fun testSendLimitSet() {
val builder = { emitter: Emitter -> emitter.sendLimit = 200 }
val builder = { emitter: Emitter -> emitter.emitRange = 200 }
val emitter = Emitter(context, "com.acme", builder)
Assert.assertEquals(200, emitter.sendLimit.toLong())
Assert.assertEquals(200, emitter.emitRange.toLong())
}

@Test
Expand All @@ -156,7 +156,7 @@ class EmitterTest {
emitter.requestSecurity = Protocol.HTTP
emitter.emitterTick = 250
emitter.emptyLimit = 5
emitter.sendLimit = 200
emitter.emitRange = 200
emitter.byteLimitGet = 20000
emitter.byteLimitPost = 25000
emitter.eventStore = MockEventStore()
Expand Down Expand Up @@ -198,7 +198,7 @@ class EmitterTest {
emitter1.requestSecurity = Protocol.HTTP
emitter1.emitterTick = 250
emitter1.emptyLimit = 5
emitter1.sendLimit = 200
emitter1.emitRange = 200
emitter1.byteLimitGet = 20000
emitter1.byteLimitPost = 25000
emitter1.eventStore = MockEventStore()
Expand Down Expand Up @@ -452,14 +452,67 @@ class EmitterTest {
emitter.flush()
}

@Test
fun testNumberOfRequestsMatchesEmitRangeAndOversize() {
val networkConnection = MockNetworkConnection(HttpMethod.POST, 200)
val emitter = getEmitter(networkConnection, BufferOption.Single)
emitter.emitRange = 20

emitter.pauseEmit()
emitter.eventStore?.removeAllEvents()
for (payload in generatePayloads(20)) {
emitter.add(payload)
}
networkConnection.clear()
emitter.resumeEmit()
Thread.sleep(500)

// made a single request
Assert.assertEquals(1, networkConnection.sendingCount())
Assert.assertEquals(1, networkConnection.previousResults.first().size)

networkConnection.clear()

emitter.pauseEmit()
for (payload in generatePayloads(40)) {
emitter.add(payload)
}
emitter.resumeEmit()

Thread.sleep(500)

// made two requests one after the other
Assert.assertEquals(2, networkConnection.sendingCount())
Assert.assertEquals(1, networkConnection.previousResults.map { it.size }.max())

networkConnection.clear()

// test with oversize requests
emitter.byteLimitPost = 5

emitter.pauseEmit()
for (payload in generatePayloads(2)) {
emitter.add(payload)
}
emitter.resumeEmit()

Thread.sleep(500)

// made two requests at once
Assert.assertEquals(1, networkConnection.sendingCount())
Assert.assertEquals(2, networkConnection.previousResults.first().size)

emitter.flush()
}

// Emitter Builder
private fun getEmitter(networkConnection: NetworkConnection?, option: BufferOption?): Emitter {
private fun getEmitter(networkConnection: NetworkConnection, option: BufferOption): Emitter {
val builder = { emitter: Emitter ->
emitter.networkConnection = networkConnection
emitter.bufferOption = option!!
emitter.bufferOption = option
emitter.emitterTick = 0
emitter.emptyLimit = 0
emitter.sendLimit = 200
emitter.emitRange = 200
emitter.byteLimitGet = 20000
emitter.byteLimitPost = 25000
emitter.eventStore = MockEventStore()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ class MockNetworkConnection(override var httpMethod: HttpMethod, var statusCode:
fun countRequests(): Int {
return allRequests.size
}

fun clear() {
previousRequests.clear()
previousResults.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class EventSendingTest {
emitter.requestSecurity = Protocol.HTTP
emitter.emitterTick = 0
emitter.emptyLimit = 0
emitter.emitRange = 1
}
val emitter = Emitter(InstrumentationRegistry.getInstrumentation().targetContext, uri, builder)
val subject = Subject(InstrumentationRegistry.getInstrumentation().targetContext, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
/**
* The maximum amount of events to grab for an emit attempt.
*/
var sendLimit: Int = EmitterDefaults.sendLimit
var emitRange: Int = EmitterDefaults.emitRange

/**
* The GET byte limit
Expand Down Expand Up @@ -556,7 +556,7 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
}

emptyCount = 0
val events = eventStore.getEmittableEvents(sendLimit)
val events = eventStore.getEmittableEvents(emitRange)
val requests = buildRequests(events, networkConnection.httpMethod)
val results = networkConnection.sendRequests(requests)

Expand Down Expand Up @@ -639,47 +639,44 @@ class Emitter(context: Context, collectorUri: String, builder: ((Emitter) -> Uni
}
}
} else {
var i = 0
while (i < events.size) {
var reqEventIds: MutableList<Long> = ArrayList()
var postPayloadMaps: MutableList<Payload> = ArrayList()

var j = i
while (j < i + bufferOption.code && j < events.size) {
val event = events[j]
val payload = event?.payload
val eventId = event?.eventId
if (payload != null && eventId != null) {
addSendingTimeToPayload(payload, sendingTime)
if (isOversize(payload, httpMethod)) {
val request = Request(payload, eventId, true)
requests.add(request)
} else if (isOversize(payload, postPayloadMaps, httpMethod)) {
val request = Request(postPayloadMaps, reqEventIds)
requests.add(request)

// Clear collections and build a new POST
postPayloadMaps = ArrayList()
reqEventIds = ArrayList()

// Build and store the request
postPayloadMaps.add(payload)
reqEventIds.add(eventId)
} else {
postPayloadMaps.add(payload)
reqEventIds.add(eventId)
}
j++
}

}
var eventIds: MutableList<Long> = ArrayList()
var eventPayloads: MutableList<Payload> = ArrayList()

// Check if all payloads have been processed
if (postPayloadMaps.isNotEmpty()) {
val request = Request(postPayloadMaps, reqEventIds)
for (event in events) {
if (event == null) { continue }
val payload = event.payload
val eventId = event.eventId
addSendingTimeToPayload(payload, sendingTime)

// Oversize event -> separate requests
if (isOversize(payload, httpMethod)) {
val request = Request(payload, eventId, true)
requests.add(request)
}
i += bufferOption.code
// Events up to this one are oversize -> create request for them
else if (isOversize(payload, eventPayloads, httpMethod)) {
val request = Request(eventPayloads, eventIds)
requests.add(request)

// Clear collections and build a new POST
eventPayloads = ArrayList()
eventIds = ArrayList()

// Build and store the request
eventPayloads.add(payload)
eventIds.add(eventId)
}
// Add to the list of events for the request
else {
eventPayloads.add(payload)
eventIds.add(eventId)
}
}

// Check if there are any remaining events not in a request
if (eventPayloads.isNotEmpty()) {
val request = Request(eventPayloads, eventIds)
requests.add(request)
}
}
return requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class EmitterControllerImpl(serviceProvider: ServiceProviderInterface) :
}

override var emitRange: Int
get() = emitter.sendLimit
get() = emitter.emitRange
set(emitRange) {
dirtyConfig.emitRange = emitRange
emitter.sendLimit = emitRange
emitter.emitRange = emitRange
}

override val threadPoolSize: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ object EmitterDefaults {
var bufferOption = BufferOption.Single
var httpProtocol = Protocol.HTTPS
var tlsVersions: EnumSet<TLSVersion> = EnumSet.of(TLSVersion.TLSv1_2)
var emitRange: Int = 150
var emitRange: Int = BufferOption.LargeGroup.code
var emitterTick = 5
var sendLimit = 250
var emptyLimit = 5
var byteLimitGet: Long = 40000
var byteLimitPost: Long = 40000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class ServiceProvider(
emitter.client = networkConfiguration.okHttpClient
emitter.cookieJar = networkConfiguration.okHttpCookieJar
emitter.emitTimeout = networkConfiguration.timeout
emitter.sendLimit = emitterConfiguration.emitRange
emitter.emitRange = emitterConfiguration.emitRange
emitter.bufferOption = emitterConfiguration.bufferOption
emitter.eventStore = emitterConfiguration.eventStore
emitter.byteLimitPost = emitterConfiguration.byteLimitPost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.json.JSONObject
* Default values:
* - bufferOption: [BufferOption.Single]
* - serverAnonymisation: false
* - emitRange: 150 - maximum number of events to process at a time
* - emitRange: 25 - maximum number of events to process at a time
* - threadPoolSize: 15
* - byteLimitGet: 40000 bytes
* - byteLimitPost: 40000 bytes
Expand Down

0 comments on commit ad8f1b8

Please sign in to comment.