Skip to content

Commit

Permalink
akka-loader: loadScheduled task
Browse files Browse the repository at this point in the history
  • Loading branch information
Blackmorse committed Feb 14, 2024
1 parent c3405f9 commit 9e2a826
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 47 deletions.
6 changes: 5 additions & 1 deletion akka-loader/src/main/scala/LoaderApp.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import akka.actor.{ActorRef, ActorSystem}
import chpp.OauthTokens
import chpp.worlddetails.models.WorldDetails
import cli.{CommandLine, LoadConfig, ScheduleConfig, TeamRankingsConfig}
import cli.{CommandLine, LoadConfig, LoadScheduledConfig, ScheduleConfig, TeamRankingsConfig}
import clickhouse.TeamRankJoiner
import com.google.inject.Guice
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -56,6 +56,10 @@ object LoaderApp extends App {
worldDetails.leagueList.foreach(league => {
Await.result(TeamRankJoiner.joinTeamRankings(config, league), 3.minute)
})
case LoadScheduledConfig(entity, lastMatchWindow) =>
val (taskExecutorActor, scheduler) = executorAndScheduler(entity, lastMatchWindow, executorActorFactory, worldDetails)
scheduler.loadScheduled()
actorSystem.scheduler.scheduleWithFixedDelay(0.second , 5.second)(() => taskExecutorActor ! TryToExecute)
}

private def executorAndScheduler(entity: String, lastMatchesWindow: Int, executorActorFactory: ExecutorActorFactory, worldDetails: WorldDetails): (ActorRef, AbstractScheduler) = {
Expand Down
4 changes: 4 additions & 0 deletions akka-loader/src/main/scala/cli/CommandLine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ sealed trait CliConfig
case class ScheduleConfig(from: Option[String], entity: String, lastMatchWindow: Int) extends CliConfig
case class LoadConfig(leagues: List[String], entity: String, lastMatchWindow: Int) extends CliConfig
case class TeamRankingsConfig(league: Option[String]) extends CliConfig
case class LoadScheduledConfig(entity: String, lastMatchWindow: Int) extends CliConfig


class CommandLine(arguments: Array[String]) extends ScallopConf(arguments) {
Expand All @@ -24,15 +25,18 @@ class CommandLine(arguments: Array[String]) extends ScallopConf(arguments) {
val teamRankings = new Subcommand("teamRankings") {
val league = opt[String](required = false)
}
val loadScheduled = new EntitySubcommand("loadScheduled") {}
addSubcommand(schedule)
addSubcommand(load)
addSubcommand(loadScheduled)
verify()

def toCliConfig: CliConfig = {
this.subcommand match {
case Some(this.schedule) => ScheduleConfig(this.schedule.from.toOption, this.schedule.entity(), this.schedule.lastMatchWindow())
case Some(this.load) => LoadConfig(this.load.leagues(), this.load.entity(), this.schedule.lastMatchWindow())
case Some(this.teamRankings) => TeamRankingsConfig(this.teamRankings.league.toOption)
case Some(this.loadScheduled) => LoadScheduledConfig(this.loadScheduled.entity(), this.loadScheduled.lastMatchWindow())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import com.crobox.clickhouse.ClickhouseClient
import com.crobox.clickhouse.internal.QuerySettings
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import utils.realRound

import javax.inject.{Inject, Singleton}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

@Singleton
class PlayerStatsClickhouseClient @Inject()(val config: Config,
implicit val actorSystem: ActorSystem)
class HattidClickhouseClient @Inject()(val config: Config,
implicit val actorSystem: ActorSystem)
{
import actorSystem.dispatcher
private val logger = LoggerFactory.getLogger(this.getClass)
Expand Down Expand Up @@ -41,4 +43,14 @@ class PlayerStatsClickhouseClient @Inject()(val config: Config,
logger.info(s"Truncating table $table while joining for (${league.leagueId}, ${league.leagueName})...")
client.execute(TableTruncater.sql(league, matchType, table, databaseName))
}

def checkDataInMatchDetails(league: League, matchType: MatchType.Value): Boolean = {
val round = realRound(matchType, league)
val season = league.season - league.seasonOffset
val cupLevelCondition = if (matchType == MatchType.LEAGUE_MATCH) " = 0 " else " != 0"
val f = client.query(s"SELECT count() from $databaseName.match_details where " +
s" league_id = ${league.leagueId} and season = $season and round = $round and cup_level_index $cupLevelCondition")
val res = Await.result(f, 30.seconds).trim.replace("\n", "").replace("\r", "")
res != "0"
}
}
4 changes: 2 additions & 2 deletions akka-loader/src/main/scala/clickhouse/PlayerStatsJoiner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package clickhouse

import chpp.commonmodels.MatchType
import chpp.worlddetails.models.League
import utils.realRound

object PlayerStatsJoiner {
def playerStatsJoinRequest(league: League, matchType: MatchType.Value, databaseName: String): String = {
val leagueId = league.leagueId
val round = if (matchType == MatchType.LEAGUE_MATCH) league.matchRound - 1 else if (matchType == MatchType.CUP_MATCH) league.matchRound
else throw new IllegalArgumentException(matchType.toString)
val round = realRound(matchType, league)
val season = league.season - league.seasonOffset

s"""INSERT INTO $databaseName.player_stats SELECT
Expand Down
4 changes: 2 additions & 2 deletions akka-loader/src/main/scala/clickhouse/TableTruncater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package clickhouse

import chpp.commonmodels.MatchType
import chpp.worlddetails.models.League
import utils.realRound

object TableTruncater {
def sql(league: League, matchType: MatchType.Value, table: String, database: String): String = {
val season = league.season - league.seasonOffset
val round = if (matchType == MatchType.LEAGUE_MATCH) league.matchRound - 1 else if (matchType == MatchType.CUP_MATCH) league.matchRound
else throw new IllegalArgumentException(matchType.toString)
val round = realRound(matchType, league)
s"ALTER TABLE $database.$table DELETE WHERE (season = $season) AND (round = $round)"
}
}
10 changes: 7 additions & 3 deletions akka-loader/src/main/scala/executors/CupExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import akka.stream.scaladsl.Sink
import chpp.OauthTokens
import chpp.commonmodels.MatchType
import chpp.worlddetails.models.{League, WorldDetails}
import clickhouse.PlayerStatsClickhouseClient
import clickhouse.HattidClickhouseClient
import telegram.LoaderTelegramClient

import scala.concurrent.Future

class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]],
playerStatsClickhouseClient: PlayerStatsClickhouseClient,
hattidClickhouseClient: HattidClickhouseClient,
worldDetails: WorldDetails,
telegramClient: LoaderTelegramClient
) (implicit oauthTokens: OauthTokens)
extends TaskExecutorActor(graph, worldDetails, (m => m): Future[Done] => Future[Done], telegramClient) {
override def postProcessLoadedResults(league: League, matValue: Done): Future[_] = {
playerStatsClickhouseClient.join(league, MatchType.CUP_MATCH)
hattidClickhouseClient.join(league, MatchType.CUP_MATCH)
}

override def notifyLeagueStarted(league: League): Unit = ()

override def notifyLeagueFinished(league: League): Unit = ()

override def notifyScheduled(tasks: List[TaskExecutorActor.ScheduleTask]): Unit = ()

override def checkTaskAlreadyDone(league: League): Boolean = {
hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.CUP_MATCH)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.stream.scaladsl.{Flow, Keep}
import alltid.AlltidClient
import chpp.OauthTokens
import chpp.worlddetails.models.WorldDetails
import clickhouse.PlayerStatsClickhouseClient
import clickhouse.HattidClickhouseClient
import com.crobox.clickhouse.ClickhouseClient
import com.crobox.clickhouse.internal.QuerySettings
import com.crobox.clickhouse.stream.{ClickhouseSink, Insert}
Expand All @@ -21,7 +21,7 @@ class ExecutorActorFactory @Inject()
implicit val oauthTokens: OauthTokens,
val clickhouseClient: ClickhouseClient,
val config: Config,
val hattidClient: PlayerStatsClickhouseClient,
val hattidClient: HattidClickhouseClient,
val alltidClient: AlltidClient,
val telegramClient: LoaderTelegramClient) {
import actorSystem.dispatcher
Expand Down
10 changes: 7 additions & 3 deletions akka-loader/src/main/scala/executors/LeagueExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import alltid.AlltidClient
import chpp.OauthTokens
import chpp.commonmodels.MatchType
import chpp.worlddetails.models.{League, WorldDetails}
import clickhouse.{ClickhouseWriter, PlayerStatsClickhouseClient, TeamRankJoiner}
import clickhouse.{ClickhouseWriter, HattidClickhouseClient, TeamRankJoiner}
import com.crobox.clickhouse.stream.Insert
import com.typesafe.config.Config
import models.stream.StreamTeam
Expand All @@ -23,7 +23,7 @@ object LeagueExecutorActor {
class LeagueExecutorActor
(graph: Sink[Int, (Future[List[StreamTeam]], Future[Done])],
chSink: Sink[Insert, Future[Done]],
playerStatsClickhouseClient: PlayerStatsClickhouseClient,
hattidClickhouseClient: HattidClickhouseClient,
worldDetails: WorldDetails,
config: Config,
alltidClient: AlltidClient,
Expand All @@ -34,7 +34,7 @@ class LeagueExecutorActor

override def postProcessLoadedResults(league: League, matValue: (List[StreamTeam], Done)): Future[_] = {
for {
_ <- playerStatsClickhouseClient.join(league, MatchType.LEAGUE_MATCH)
_ <- hattidClickhouseClient.join(league, MatchType.LEAGUE_MATCH)
_ <- TeamRankJoiner.joinTeamRankings(config, league)
promotions <- PromotionsCalculator.calculatePromotions(league, matValue._1)
finalFuture <- ClickhouseWriter.writeToCh(promotions, chSink, context.system.settings)
Expand All @@ -54,4 +54,8 @@ class LeagueExecutorActor
override def notifyScheduled(tasks: List[TaskExecutorActor.ScheduleTask]): Unit = {
alltidClient.notifyScheduleInfo(tasks)
}

override def checkTaskAlreadyDone(league: League): Boolean = {
hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.LEAGUE_MATCH)
}
}
66 changes: 37 additions & 29 deletions akka-loader/src/main/scala/executors/TaskExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,44 +64,52 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
val task = tasks.head
if (task.time.before(new Date())) {
tasks = tasks.drop(1)
running = true
val league = worldDetails.leagueList.filter(_.leagueId == task.leagueId).head
notifyLeagueStarted(league)
logger.info(s"Started league (${task.leagueId}, ${league.leagueName})")

val mat = Source.single(task.leagueId).toMat(graph)(Keep.right).run()

matToFuture(mat).onComplete {
case Failure(exception) =>
logger.error(s"Failed to upload ${task.leagueId}", exception)
self ! ScheduleTask(task.leagueId,
new Date(System.currentTimeMillis() + 30 * 60 * 1000))
telegramClient.sendException("Loader failed at streaming stage", exception)
self ! TaskFinished
case Success(matValue) =>
val updatedLeagueFuture = WorldDetailsSingleRequest.request(leagueId = Some(league.leagueId)).map(_.leagueList.head);

updatedLeagueFuture.foreach(updatedLeague => {
val result = postProcessLoadedResults(updatedLeague, matValue)
result.onComplete {
case Failure(exception) =>
logger.error(exception.getMessage, exception)
telegramClient.sendException("Loader failed at post processing stage", exception)
self ! TaskFinished
case Success(_) =>
logger.info(s"(${updatedLeague.leagueId}, ${updatedLeague.leagueName}) successfully loaded")
notifyLeagueFinished(updatedLeague)
self ! TaskFinished
}})
if(checkTaskAlreadyDone(league)) {
logger.info(s"(${task.leagueId}, ${league.leagueName}) is already done!")
self ! TryToExecute
} else {
running = true

notifyLeagueStarted(league)
logger.info(s"Started league (${task.leagueId}, ${league.leagueName})")

val mat = Source.single(task.leagueId).toMat(graph)(Keep.right).run()

matToFuture(mat).onComplete {
case Failure(exception) =>
logger.error(s"Failed to upload ${task.leagueId}", exception)
self ! ScheduleTask(task.leagueId,
new Date(System.currentTimeMillis() + 30 * 60 * 1000))
telegramClient.sendException("Loader failed at streaming stage", exception)
self ! TaskFinished
case Success(matValue) =>
val updatedLeagueFuture = WorldDetailsSingleRequest.request(leagueId = Some(league.leagueId)).map(_.leagueList.head);

updatedLeagueFuture.foreach(updatedLeague => {
val result = postProcessLoadedResults(updatedLeague, matValue)
result.onComplete {
case Failure(exception) =>
logger.error(exception.getMessage, exception)
telegramClient.sendException("Loader failed at post processing stage", exception)
self ! TaskFinished
case Success(_) =>
logger.info(s"(${updatedLeague.leagueId}, ${updatedLeague.leagueName}) successfully loaded")
notifyLeagueFinished(updatedLeague)
self ! TaskFinished
}
})
}
}

}
}
} else {
logger.debug("Some task is running!")
}
}

def checkTaskAlreadyDone(league: League): Boolean

def notifyScheduled(tasks: List[ScheduleTask])

def notifyLeagueStarted(league: League)
Expand Down
2 changes: 2 additions & 0 deletions akka-loader/src/main/scala/scheduler/AbstractScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ abstract class AbstractScheduler(val worldDetails: WorldDetails) {
loadIds(leagueIds)
}

def loadScheduled(): Unit

private def findLeagueIdByName(leagueName: String): Int = {
if (leagueName.forall(_.isDigit)) {
leagueName.toInt
Expand Down
14 changes: 14 additions & 0 deletions akka-loader/src/main/scala/scheduler/CupScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ class CupScheduler(worldDetails: WorldDetails,
taskExecutorActor ! ScheduleFinished
}

override def loadScheduled(): Unit = {
val dayLightSavingOffset = if (CupSchedule.isSummerTimeNow()) 0L else 1000L * 60 * 60

CupSchedule.normalizeCupScheduleToDayOfWeek(cupSchedule, Calendar.MONDAY)
.map(scheduleEntry => {
val scheduledDate = new Date(scheduleEntry.date.getTime + threeHoursMs + dayLightSavingOffset)
ScheduleTask(scheduleEntry.leagueId, scheduledDate)
})
.filter(_.time.before(new Date()))
.foreach(task => taskExecutorActor ! task)

taskExecutorActor ! ScheduleFinished
}

override protected def scheduleFrom(leagueId: Int): Unit = {
val dayLightSavingOffset = if (CupSchedule.isSummerTimeNow()) 0L else 1000L * 60 * 60

Expand Down
29 changes: 27 additions & 2 deletions akka-loader/src/main/scala/scheduler/LeagueScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package scheduler
import executors.TaskExecutorActor.{ScheduleFinished, ScheduleTask}
import akka.actor.ActorRef
import chpp.worlddetails.models.WorldDetails
import hattid.CommonData
import hattid.{CommonData, CupSchedule}
import scheduler.LeagueScheduler.{countriesToMinutesOffset, firstLeagueId, lastLeagueId}

import java.util.Date
import java.util.{Calendar, Date}

object LeagueScheduler {
private val firstLeagueId = 1000
Expand Down Expand Up @@ -74,4 +74,29 @@ class LeagueScheduler(worldDetails: WorldDetails,

taskExecutorActor ! ScheduleFinished
}

override def loadScheduled(): Unit = {
val lastLeague = worldDetails.leagueList.filter(_.leagueId == lastLeagueId).head
val firstLeague = worldDetails.leagueList.filter(_.leagueId == firstLeagueId).head
val matchesAlreadyFinished = firstLeague.seriesMatchDate.after(new Date()) &&
lastLeague.seriesMatchDate.after(new Date()) && lastLeague.seriesMatchDate.after(firstLeague.seriesMatchDate)

val previousWeekMs = if (matchesAlreadyFinished) 1000L * 3600 * 24 * 7 else 0L
worldDetails.leagueList
.map(league => {
val minutesOffset: Long = countriesToMinutesOffset.getOrElse(league.leagueId, 0)

val date = if (league.seriesMatchDate.after(lastLeague.seriesMatchDate)) {
new Date(league.seriesMatchDate.getTime - 1000L * 3600 * 24 * 7
+ threeHoursMs + (minutesOffset * 60 * 1000))
} else {
new Date(league.seriesMatchDate.getTime + threeHoursMs + +minutesOffset * 60 * 1000)
}

ScheduleTask(league.leagueId, new Date(date.getTime - previousWeekMs))
})
.filter(_.time.before(new Date()))
.sortBy(_.time)
.foreach(taskExecutorActor ! _)
}
}
9 changes: 9 additions & 0 deletions akka-loader/src/main/scala/utils/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import chpp.commonmodels.MatchType
import chpp.worlddetails.models.League

package object utils {
def realRound(matchType: MatchType.Value, league: League): Int =
if (matchType == MatchType.LEAGUE_MATCH) league.matchRound - 1
else if (matchType == MatchType.CUP_MATCH) league.matchRound
else throw new IllegalArgumentException(matchType.toString)
}

0 comments on commit 9e2a826

Please sign in to comment.