Skip to content

Commit

Permalink
akka-loader: recalculations
Browse files Browse the repository at this point in the history
  • Loading branch information
Blackmorse committed Feb 19, 2024
1 parent af17f9d commit 37b705b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 16 deletions.
85 changes: 80 additions & 5 deletions akka-loader/src/main/scala/clickhouse/HattidClickhouseClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,93 @@ class HattidClickhouseClient @Inject()(val config: Config,
client.execute(PlayerStatsJoiner.playerStatsJoinRequest(league, matchType, databaseName))
}

private def truncateTable(table: String, league: League, matchType: MatchType.Value): Future[String] = {
def truncateTable(table: String, league: League, matchType: MatchType.Value): Future[String] = {
logger.info(s"Truncating table $table while joining for (${league.leagueId}, ${league.leagueName})...")
client.execute(TableTruncater.sql(league, matchType, table, databaseName))
}

def checkDataInMatchDetails(league: League, matchType: MatchType.Value): Boolean = {
def checkInMatchDetailsAndPlayerStatsAreEmpty(league: League, matchType: MatchType.Value): Boolean = {
val round = realRound(matchType, league)
val season = league.season - league.seasonOffset
val cupLevelCondition = if (matchType == MatchType.LEAGUE_MATCH) " = 0 " else " != 0"
val f = client.query(s"SELECT count() from $databaseName.match_details where " +
val matchDetailsCountFuture = client.query(s"SELECT count() from $databaseName.match_details where " +
s" league_id = ${league.leagueId} and season = $season and round = $round and cup_level_index $cupLevelCondition")
val res = Await.result(f, 30.seconds).trim.replace("\n", "").replace("\r", "")
res != "0"

val playerStatsCountFuture = client.query(s"SELECT count() from $databaseName.player_stats where " +
s" league_id = ${league.leagueId} and season = $season and round = $round and cup_level_index $cupLevelCondition")

val matchDetailsCount = Await.result(matchDetailsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "")
val playerStatsCount = Await.result(playerStatsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "")

if (matchType == MatchType.LEAGUE_MATCH) {
val teamDetailsCountrsFuture = client.query(s"SELECT count() from $databaseName.team_details where " +
s" league_id = ${league.leagueId} and season = $season and round = $round")

val teamRankingsCountFuture = client.query(s"SELECT count() from $databaseName.team_rankings where " +
s" league_id = ${league.leagueId} and season = $season and round = $round")

val teamDetailsCount = Await.result(teamDetailsCountrsFuture, 30.seconds).trim.replace("\n", "").replace("\r", "")
val teamRankingsCount = Await.result(teamRankingsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "")

matchDetailsCount == "0" && playerStatsCount == "0" && teamDetailsCount == "0" && teamRankingsCount == "0"
} else {
matchDetailsCount == "0" && playerStatsCount == "0"
}
}

def logUploadEntry(league: League, matchType: MatchType.Value): Future[_] = {
val round = realRound(matchType, league)
val season = league.season - league.seasonOffset
val isLeagueMatch = if (matchType == MatchType.LEAGUE_MATCH) 1 else 0
client.execute(s"INSERT INTO $databaseName.upload_history (league_id, season, round, is_league_match) VALUES " +
s" (${league.leagueId}, $season, $round, $isLeagueMatch)")
}

def checkUploaded(league: League, matchType: MatchType.Value): Boolean = {
val round = realRound(matchType, league)
val season = league.season - league.seasonOffset
val isLeagueMatch = if (matchType == MatchType.LEAGUE_MATCH) 1 else 0

val resultFuture = client.query(s"SELECT count() from $databaseName.upload_history WHERE " +
s" league_id = ${league.leagueId} AND season = $season AND round = $round and is_league_match = $isLeagueMatch")

val result = Await.result(resultFuture, 30.seconds).trim.replace("\n", "").replace("\r", "")

result != "0"
}

def tryToFixLeagueData(league: League): Future[_] = {
val round = realRound(MatchType.LEAGUE_MATCH, league)
val season = league.season - league.seasonOffset
val cupLevelCondition = " AND cup_level_index = 0"

def deleteSqlQuery(table: String, cupLevelCondition: String): String =
s"DELETE FROM $databaseName.$table WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition"

for {
_ <- client.execute(deleteSqlQuery("match_details", cupLevelCondition))
_ <- truncateTable("player_info", league, MatchType.LEAGUE_MATCH)
_ <- truncateTable("player_events", league, MatchType.LEAGUE_MATCH)
// Lightweight deletes is not supported for tables with projections
_ <- client.execute(s"ALTER TABLE $databaseName.player_stats DELETE WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition SETTINGS mutations_sync = 1")
_ <- client.execute(deleteSqlQuery("team_details", ""))
r <- client.execute(deleteSqlQuery("team_rankings", ""))
} yield r
}

def tryToFixCupData(league: League): Future[_] = {
val round = realRound(MatchType.CUP_MATCH, league)
val season = league.season - league.seasonOffset
val cupLevelCondition = " AND cup_level_index != 0"

def alterDeleteSqlQuery(table: String, cupLevelCondition: String): String =
s"DELETE FROM $databaseName.$table WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition"

for {
_ <- client.execute(alterDeleteSqlQuery("match_details", cupLevelCondition))
_ <- truncateTable(s"$databaseName.player_info", league, MatchType.LEAGUE_MATCH)
_ <- truncateTable(s"$databaseName.player_events", league, MatchType.LEAGUE_MATCH)
r <- client.execute(alterDeleteSqlQuery("player_stats", cupLevelCondition))
} yield r
}
}
45 changes: 41 additions & 4 deletions akka-loader/src/main/scala/executors/CupExecutorActor.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package executors

import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import chpp.OauthTokens
import chpp.commonmodels.MatchType
import chpp.worlddetails.models.{League, WorldDetails}
import clickhouse.HattidClickhouseClient
import telegram.LoaderTelegramClient

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]],
hattidClickhouseClient: HattidClickhouseClient,
worldDetails: WorldDetails,
telegramClient: LoaderTelegramClient
) (implicit oauthTokens: OauthTokens)
extends TaskExecutorActor(graph, worldDetails, (m => m): Future[Done] => Future[Done], telegramClient) {
override def postProcessLoadedResults(league: League, matValue: Done): Future[_] = {

implicit val actorSystem: ExecutionContext = context.system.dispatcher

def postProcessLoadedResults(league: League, matValue: Done): Future[_] = {
hattidClickhouseClient.join(league, MatchType.CUP_MATCH)
}

Expand All @@ -25,7 +31,38 @@ class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]],

override def notifyScheduled(tasks: List[TaskExecutorActor.ScheduleTask]): Unit = ()

override def checkTaskAlreadyDone(league: League): Boolean = {
hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.CUP_MATCH)

override def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean = {
if(hattidClickhouseClient.checkUploaded(league, MatchType.CUP_MATCH)) {
return true
} else {
val someDataWasUploaded = !hattidClickhouseClient.checkInMatchDetailsAndPlayerStatsAreEmpty(league, MatchType.CUP_MATCH)
if (someDataWasUploaded) {
// Something wrong. Should be fixed
val future = hattidClickhouseClient.tryToFixCupData(league)

Try { Await.result(future, 3.minutes) } match {
case Failure(exception) =>
// Smth wrong with the data that can't be fixed. Report and skip the task
telegramClient.sendException(s"Corrupted uploaded data can't be fixed. League: ${league.leagueId} ${league.englishName}", exception)
return true
case Success(_) => return false
}
} else {
return false
}
}
}

override def logTaskFinished(league: League): Future[_] =
hattidClickhouseClient.logUploadEntry(league, MatchType.CUP_MATCH)

override def preCleanupTables(league: League): Unit = {

val f = for {
_ <- hattidClickhouseClient.truncateTable("player_info", league, MatchType.CUP_MATCH)
r <- hattidClickhouseClient.truncateTable("player_events", league, MatchType.CUP_MATCH)
} yield r
Await.result(f, 30.second)
}
}
39 changes: 36 additions & 3 deletions akka-loader/src/main/scala/executors/LeagueExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import models.stream.StreamTeam
import promotions.PromotionsCalculator
import telegram.LoaderTelegramClient

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}

object LeagueExecutorActor {
type LeagueMat = (Future[List[StreamTeam]], Future[Done])
Expand Down Expand Up @@ -55,7 +57,38 @@ class LeagueExecutorActor
alltidClient.notifyScheduleInfo(tasks)
}

override def checkTaskAlreadyDone(league: League): Boolean = {
hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.LEAGUE_MATCH)
override def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean = {
if(hattidClickhouseClient.checkUploaded(league, MatchType.LEAGUE_MATCH)) {
return true
} else {
val someDataWasUploaded = !hattidClickhouseClient.checkInMatchDetailsAndPlayerStatsAreEmpty(league, MatchType.LEAGUE_MATCH)
if (someDataWasUploaded) {
// Something wrong. Should be fixed
val future = hattidClickhouseClient.tryToFixLeagueData(league)

Try { Await.result(future, 3.minutes) } match {
case Failure(exception) =>
// Smth wrong with the data that can't be fixed. Report and skip the task
telegramClient.sendException(s"Corrupted uploaded data can't be fixed. League: ${league.leagueId} ${league.englishName}", exception)
return true
case Success(_) => return false
}
} else {
return false
}
}
}

override def logTaskFinished(league: League): Future[_] =
hattidClickhouseClient.logUploadEntry(league, MatchType.LEAGUE_MATCH)


override def preCleanupTables(league: League): Unit = {

val f = for {
_ <- hattidClickhouseClient.truncateTable("player_info", league, MatchType.LEAGUE_MATCH)
r <- hattidClickhouseClient.truncateTable("player_events", league, MatchType.LEAGUE_MATCH)
} yield r
Await.result(f, 30.second)
}
}
20 changes: 16 additions & 4 deletions akka-loader/src/main/scala/executors/TaskExecutorActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import akka.actor.Actor
import akka.stream.scaladsl.{Keep, Sink, Source}
import chpp.OauthTokens
import chpp.worlddetails.models.{League, WorldDetails}
import clickhouse.HattidClickhouseClient
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
import telegram.LoaderTelegramClient
Expand Down Expand Up @@ -65,12 +66,13 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
if (task.time.before(new Date())) {
tasks = tasks.drop(1)
val league = worldDetails.leagueList.filter(_.leagueId == task.leagueId).head
if(checkTaskAlreadyDone(league)) {
if(checkTaskAlreadyDoneAndTryToFix(league)) {
logger.info(s"(${task.leagueId}, ${league.leagueName}) is already done!")
self ! TryToExecute
} else {
running = true

preCleanupTables(league)
notifyLeagueStarted(league)
logger.info(s"Started league (${task.leagueId}, ${league.leagueName})")

Expand All @@ -95,8 +97,14 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
self ! TaskFinished
case Success(_) =>
logger.info(s"(${updatedLeague.leagueId}, ${updatedLeague.leagueName}) successfully loaded")
notifyLeagueFinished(updatedLeague)
self ! TaskFinished
logTaskFinished(updatedLeague).onComplete {
case Failure(e) =>
telegramClient.sendException("Loader failed at marking task with history log", e)
self ! TaskFinished
case Success(_) =>
notifyLeagueFinished(updatedLeague)
self ! TaskFinished
}
}
})
}
Expand All @@ -108,7 +116,7 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
}
}

def checkTaskAlreadyDone(league: League): Boolean
def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean

def notifyScheduled(tasks: List[ScheduleTask])

Expand All @@ -117,4 +125,8 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat],
def notifyLeagueFinished(league: League)

def postProcessLoadedResults(league: League, matValue: MatValue): Future[_]

def logTaskFinished(league: League): Future[_]

def preCleanupTables(league: League): Unit
}
9 changes: 9 additions & 0 deletions sql/init_scripts/15-upload-history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS hattrick.upload_history (
`league_id` UInt16,
`season` UInt8,
`round` UInt8,
`is_league_match` UInt8,
`time` DateTime DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY (season, round, league_id)

0 comments on commit 37b705b

Please sign in to comment.