Skip to content

Commit

Permalink
akka-loader: rolback and working shitfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Blackmorse committed Jan 11, 2025
1 parent db97994 commit 615b821
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ abstract class AbstractHttpFlow[Request <: AbstractRequest[Model], Model] {

import system.dispatcher
Flow[(Request, T)]
.mapAsyncUnordered(512) {
.mapAsyncUnordered(2) {
case (request, t) =>
ChppRequestExecutor.executeWithRetry(request)
.recover {
Expand Down
22 changes: 8 additions & 14 deletions chpp/src/main/scala/chpp/ChppRequestExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import org.apache.pekko.actor.{ActorSystem, Scheduler}
import org.apache.pekko.http.scaladsl.Http
import chpp.chpperror.ChppError
import com.lucidchart.open.xtract.{ParseError, XmlReader}
import org.apache.pekko.http.scaladsl.model.{HttpRequest, HttpResponse}
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings
import org.apache.pekko.stream.scaladsl.Source

import scala.concurrent.Future
import scala.concurrent.duration.*
Expand Down Expand Up @@ -46,22 +46,16 @@ object ChppRequestExecutor {
}
}



private def execute[Model](request: AbstractRequest[Model])
(implicit oauthTokens: OauthTokens, system: ActorSystem, reader: XmlReader[Model]): Future[Model] = {
import system.dispatcher

val connectionFlow: Flow[HttpRequest, HttpResponse,
Future[Http.OutgoingConnection]] =
Http().outgoingConnection(host = "chpp.hattrick.org")


val r = Source.single(request.createRequest())
.via(connectionFlow)
.runWith(Sink.head)



for (response <- r;
val poolSettings = ConnectionPoolSettings(system)
.withMaxConnections(100) // Maximum connections to the same host
.withPipeliningLimit(10) // Maximum pipelined requests per connection
for (response <- Http().singleRequest(request.createRequest());
responseBody <- response.entity.toStrict(3.minute)) yield {
val rawResponse = responseBody.data.utf8String
val preprocessed = request.preprocessResponseBody(rawResponse)
Expand Down

0 comments on commit 615b821

Please sign in to comment.