From 615b82148f8ff0ae00bac5fc361e7a0f471bd401 Mon Sep 17 00:00:00 2001 From: Blackmorse Date: Sat, 11 Jan 2025 23:21:37 +0400 Subject: [PATCH] akka-loader: rolback and working shitfix --- .../scala/httpflows/AbstractHttpFlow.scala | 2 +- .../main/scala/chpp/ChppRequestExecutor.scala | 22 +++++++------------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/akka-loader/src/main/scala/httpflows/AbstractHttpFlow.scala b/akka-loader/src/main/scala/httpflows/AbstractHttpFlow.scala index fb3e075..2ed9a71 100644 --- a/akka-loader/src/main/scala/httpflows/AbstractHttpFlow.scala +++ b/akka-loader/src/main/scala/httpflows/AbstractHttpFlow.scala @@ -17,7 +17,7 @@ abstract class AbstractHttpFlow[Request <: AbstractRequest[Model], Model] { import system.dispatcher Flow[(Request, T)] - .mapAsyncUnordered(512) { + .mapAsyncUnordered(2) { case (request, t) => ChppRequestExecutor.executeWithRetry(request) .recover { diff --git a/chpp/src/main/scala/chpp/ChppRequestExecutor.scala b/chpp/src/main/scala/chpp/ChppRequestExecutor.scala index 4177ef7..b88dc4a 100644 --- a/chpp/src/main/scala/chpp/ChppRequestExecutor.scala +++ b/chpp/src/main/scala/chpp/ChppRequestExecutor.scala @@ -4,8 +4,8 @@ import org.apache.pekko.actor.{ActorSystem, Scheduler} import org.apache.pekko.http.scaladsl.Http import chpp.chpperror.ChppError import com.lucidchart.open.xtract.{ParseError, XmlReader} -import org.apache.pekko.http.scaladsl.model.{HttpRequest, HttpResponse} -import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} +import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings +import org.apache.pekko.stream.scaladsl.Source import scala.concurrent.Future import scala.concurrent.duration.* @@ -46,22 +46,16 @@ object ChppRequestExecutor { } } + + private def execute[Model](request: AbstractRequest[Model]) (implicit oauthTokens: OauthTokens, system: ActorSystem, reader: XmlReader[Model]): Future[Model] = { import system.dispatcher - val connectionFlow: Flow[HttpRequest, HttpResponse, - Future[Http.OutgoingConnection]] = - Http().outgoingConnection(host = "chpp.hattrick.org") - - - val r = Source.single(request.createRequest()) - .via(connectionFlow) - .runWith(Sink.head) - - - - for (response <- r; + val poolSettings = ConnectionPoolSettings(system) + .withMaxConnections(100) // Maximum connections to the same host + .withPipeliningLimit(10) // Maximum pipelined requests per connection + for (response <- Http().singleRequest(request.createRequest()); responseBody <- response.entity.toStrict(3.minute)) yield { val rawResponse = responseBody.data.utf8String val preprocessed = request.preprocessResponseBody(rawResponse)