diff --git a/akka-loader/src/main/scala/clickhouse/HattidClickhouseClient.scala b/akka-loader/src/main/scala/clickhouse/HattidClickhouseClient.scala index 9df6a8fd..69030a28 100644 --- a/akka-loader/src/main/scala/clickhouse/HattidClickhouseClient.scala +++ b/akka-loader/src/main/scala/clickhouse/HattidClickhouseClient.scala @@ -39,18 +39,93 @@ class HattidClickhouseClient @Inject()(val config: Config, client.execute(PlayerStatsJoiner.playerStatsJoinRequest(league, matchType, databaseName)) } - private def truncateTable(table: String, league: League, matchType: MatchType.Value): Future[String] = { + def truncateTable(table: String, league: League, matchType: MatchType.Value): Future[String] = { logger.info(s"Truncating table $table while joining for (${league.leagueId}, ${league.leagueName})...") client.execute(TableTruncater.sql(league, matchType, table, databaseName)) } - def checkDataInMatchDetails(league: League, matchType: MatchType.Value): Boolean = { + def checkInMatchDetailsAndPlayerStatsAreEmpty(league: League, matchType: MatchType.Value): Boolean = { val round = realRound(matchType, league) val season = league.season - league.seasonOffset val cupLevelCondition = if (matchType == MatchType.LEAGUE_MATCH) " = 0 " else " != 0" - val f = client.query(s"SELECT count() from $databaseName.match_details where " + + val matchDetailsCountFuture = client.query(s"SELECT count() from $databaseName.match_details where " + s" league_id = ${league.leagueId} and season = $season and round = $round and cup_level_index $cupLevelCondition") - val res = Await.result(f, 30.seconds).trim.replace("\n", "").replace("\r", "") - res != "0" + + val playerStatsCountFuture = client.query(s"SELECT count() from $databaseName.player_stats where " + + s" league_id = ${league.leagueId} and season = $season and round = $round and cup_level_index $cupLevelCondition") + + val matchDetailsCount = Await.result(matchDetailsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "") + val playerStatsCount = Await.result(playerStatsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "") + + if (matchType == MatchType.LEAGUE_MATCH) { + val teamDetailsCountrsFuture = client.query(s"SELECT count() from $databaseName.team_details where " + + s" league_id = ${league.leagueId} and season = $season and round = $round") + + val teamRankingsCountFuture = client.query(s"SELECT count() from $databaseName.team_rankings where " + + s" league_id = ${league.leagueId} and season = $season and round = $round") + + val teamDetailsCount = Await.result(teamDetailsCountrsFuture, 30.seconds).trim.replace("\n", "").replace("\r", "") + val teamRankingsCount = Await.result(teamRankingsCountFuture, 30.seconds).trim.replace("\n", "").replace("\r", "") + + matchDetailsCount == "0" && playerStatsCount == "0" && teamDetailsCount == "0" && teamRankingsCount == "0" + } else { + matchDetailsCount == "0" && playerStatsCount == "0" + } + } + + def logUploadEntry(league: League, matchType: MatchType.Value): Future[_] = { + val round = realRound(matchType, league) + val season = league.season - league.seasonOffset + val isLeagueMatch = if (matchType == MatchType.LEAGUE_MATCH) 1 else 0 + client.execute(s"INSERT INTO $databaseName.upload_history (league_id, season, round, is_league_match) VALUES " + + s" (${league.leagueId}, $season, $round, $isLeagueMatch)") + } + + def checkUploaded(league: League, matchType: MatchType.Value): Boolean = { + val round = realRound(matchType, league) + val season = league.season - league.seasonOffset + val isLeagueMatch = if (matchType == MatchType.LEAGUE_MATCH) 1 else 0 + + val resultFuture = client.query(s"SELECT count() from $databaseName.upload_history WHERE " + + s" league_id = ${league.leagueId} AND season = $season AND round = $round and is_league_match = $isLeagueMatch") + + val result = Await.result(resultFuture, 30.seconds).trim.replace("\n", "").replace("\r", "") + + result != "0" + } + + def tryToFixLeagueData(league: League): Future[_] = { + val round = realRound(MatchType.LEAGUE_MATCH, league) + val season = league.season - league.seasonOffset + val cupLevelCondition = " AND cup_level_index = 0" + + def deleteSqlQuery(table: String, cupLevelCondition: String): String = + s"DELETE FROM $databaseName.$table WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition" + + for { + _ <- client.execute(deleteSqlQuery("match_details", cupLevelCondition)) + _ <- truncateTable("player_info", league, MatchType.LEAGUE_MATCH) + _ <- truncateTable("player_events", league, MatchType.LEAGUE_MATCH) + // Lightweight deletes is not supported for tables with projections + _ <- client.execute(s"ALTER TABLE $databaseName.player_stats DELETE WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition SETTINGS mutations_sync = 1") + _ <- client.execute(deleteSqlQuery("team_details", "")) + r <- client.execute(deleteSqlQuery("team_rankings", "")) + } yield r + } + + def tryToFixCupData(league: League): Future[_] = { + val round = realRound(MatchType.CUP_MATCH, league) + val season = league.season - league.seasonOffset + val cupLevelCondition = " AND cup_level_index != 0" + + def alterDeleteSqlQuery(table: String, cupLevelCondition: String): String = + s"DELETE FROM $databaseName.$table WHERE league_id = ${league.leagueId} AND season = $season AND round = $round $cupLevelCondition" + + for { + _ <- client.execute(alterDeleteSqlQuery("match_details", cupLevelCondition)) + _ <- truncateTable(s"$databaseName.player_info", league, MatchType.LEAGUE_MATCH) + _ <- truncateTable(s"$databaseName.player_events", league, MatchType.LEAGUE_MATCH) + r <- client.execute(alterDeleteSqlQuery("player_stats", cupLevelCondition)) + } yield r } } diff --git a/akka-loader/src/main/scala/executors/CupExecutorActor.scala b/akka-loader/src/main/scala/executors/CupExecutorActor.scala index ba8e1711..e99c2e75 100644 --- a/akka-loader/src/main/scala/executors/CupExecutorActor.scala +++ b/akka-loader/src/main/scala/executors/CupExecutorActor.scala @@ -1,5 +1,6 @@ package executors +import akka.actor.ActorSystem import akka.stream.scaladsl.Sink import chpp.OauthTokens import chpp.commonmodels.MatchType @@ -7,7 +8,9 @@ import chpp.worlddetails.models.{League, WorldDetails} import clickhouse.HattidClickhouseClient import telegram.LoaderTelegramClient -import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]], hattidClickhouseClient: HattidClickhouseClient, @@ -15,7 +18,10 @@ class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]], telegramClient: LoaderTelegramClient ) (implicit oauthTokens: OauthTokens) extends TaskExecutorActor(graph, worldDetails, (m => m): Future[Done] => Future[Done], telegramClient) { - override def postProcessLoadedResults(league: League, matValue: Done): Future[_] = { + + implicit val actorSystem: ExecutionContext = context.system.dispatcher + + def postProcessLoadedResults(league: League, matValue: Done): Future[_] = { hattidClickhouseClient.join(league, MatchType.CUP_MATCH) } @@ -25,7 +31,38 @@ class CupExecutorActor[CupMat, Done](graph: Sink[Int, Future[Done]], override def notifyScheduled(tasks: List[TaskExecutorActor.ScheduleTask]): Unit = () - override def checkTaskAlreadyDone(league: League): Boolean = { - hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.CUP_MATCH) + + override def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean = { + if(hattidClickhouseClient.checkUploaded(league, MatchType.CUP_MATCH)) { + return true + } else { + val someDataWasUploaded = !hattidClickhouseClient.checkInMatchDetailsAndPlayerStatsAreEmpty(league, MatchType.CUP_MATCH) + if (someDataWasUploaded) { + // Something wrong. Should be fixed + val future = hattidClickhouseClient.tryToFixCupData(league) + + Try { Await.result(future, 3.minutes) } match { + case Failure(exception) => + // Smth wrong with the data that can't be fixed. Report and skip the task + telegramClient.sendException(s"Corrupted uploaded data can't be fixed. League: ${league.leagueId} ${league.englishName}", exception) + return true + case Success(_) => return false + } + } else { + return false + } + } + } + + override def logTaskFinished(league: League): Future[_] = + hattidClickhouseClient.logUploadEntry(league, MatchType.CUP_MATCH) + + override def preCleanupTables(league: League): Unit = { + + val f = for { + _ <- hattidClickhouseClient.truncateTable("player_info", league, MatchType.CUP_MATCH) + r <- hattidClickhouseClient.truncateTable("player_events", league, MatchType.CUP_MATCH) + } yield r + Await.result(f, 30.second) } } diff --git a/akka-loader/src/main/scala/executors/LeagueExecutorActor.scala b/akka-loader/src/main/scala/executors/LeagueExecutorActor.scala index 76956eb7..2c277d49 100644 --- a/akka-loader/src/main/scala/executors/LeagueExecutorActor.scala +++ b/akka-loader/src/main/scala/executors/LeagueExecutorActor.scala @@ -14,7 +14,9 @@ import models.stream.StreamTeam import promotions.PromotionsCalculator import telegram.LoaderTelegramClient -import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success, Try} object LeagueExecutorActor { type LeagueMat = (Future[List[StreamTeam]], Future[Done]) @@ -55,7 +57,38 @@ class LeagueExecutorActor alltidClient.notifyScheduleInfo(tasks) } - override def checkTaskAlreadyDone(league: League): Boolean = { - hattidClickhouseClient.checkDataInMatchDetails(league, MatchType.LEAGUE_MATCH) + override def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean = { + if(hattidClickhouseClient.checkUploaded(league, MatchType.LEAGUE_MATCH)) { + return true + } else { + val someDataWasUploaded = !hattidClickhouseClient.checkInMatchDetailsAndPlayerStatsAreEmpty(league, MatchType.LEAGUE_MATCH) + if (someDataWasUploaded) { + // Something wrong. Should be fixed + val future = hattidClickhouseClient.tryToFixLeagueData(league) + + Try { Await.result(future, 3.minutes) } match { + case Failure(exception) => + // Smth wrong with the data that can't be fixed. Report and skip the task + telegramClient.sendException(s"Corrupted uploaded data can't be fixed. League: ${league.leagueId} ${league.englishName}", exception) + return true + case Success(_) => return false + } + } else { + return false + } + } + } + + override def logTaskFinished(league: League): Future[_] = + hattidClickhouseClient.logUploadEntry(league, MatchType.LEAGUE_MATCH) + + + override def preCleanupTables(league: League): Unit = { + + val f = for { + _ <- hattidClickhouseClient.truncateTable("player_info", league, MatchType.LEAGUE_MATCH) + r <- hattidClickhouseClient.truncateTable("player_events", league, MatchType.LEAGUE_MATCH) + } yield r + Await.result(f, 30.second) } } diff --git a/akka-loader/src/main/scala/executors/TaskExecutorActor.scala b/akka-loader/src/main/scala/executors/TaskExecutorActor.scala index 4d1d1921..f09ebdd0 100644 --- a/akka-loader/src/main/scala/executors/TaskExecutorActor.scala +++ b/akka-loader/src/main/scala/executors/TaskExecutorActor.scala @@ -5,6 +5,7 @@ import akka.actor.Actor import akka.stream.scaladsl.{Keep, Sink, Source} import chpp.OauthTokens import chpp.worlddetails.models.{League, WorldDetails} +import clickhouse.HattidClickhouseClient import com.typesafe.scalalogging.Logger import org.slf4j.LoggerFactory import telegram.LoaderTelegramClient @@ -65,12 +66,13 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat], if (task.time.before(new Date())) { tasks = tasks.drop(1) val league = worldDetails.leagueList.filter(_.leagueId == task.leagueId).head - if(checkTaskAlreadyDone(league)) { + if(checkTaskAlreadyDoneAndTryToFix(league)) { logger.info(s"(${task.leagueId}, ${league.leagueName}) is already done!") self ! TryToExecute } else { running = true + preCleanupTables(league) notifyLeagueStarted(league) logger.info(s"Started league (${task.leagueId}, ${league.leagueName})") @@ -95,8 +97,14 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat], self ! TaskFinished case Success(_) => logger.info(s"(${updatedLeague.leagueId}, ${updatedLeague.leagueName}) successfully loaded") - notifyLeagueFinished(updatedLeague) - self ! TaskFinished + logTaskFinished(updatedLeague).onComplete { + case Failure(e) => + telegramClient.sendException("Loader failed at marking task with history log", e) + self ! TaskFinished + case Success(_) => + notifyLeagueFinished(updatedLeague) + self ! TaskFinished + } } }) } @@ -108,7 +116,7 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat], } } - def checkTaskAlreadyDone(league: League): Boolean + def checkTaskAlreadyDoneAndTryToFix(league: League): Boolean def notifyScheduled(tasks: List[ScheduleTask]) @@ -117,4 +125,8 @@ abstract class TaskExecutorActor[GraphMat, MatValue](graph: Sink[Int, GraphMat], def notifyLeagueFinished(league: League) def postProcessLoadedResults(league: League, matValue: MatValue): Future[_] + + def logTaskFinished(league: League): Future[_] + + def preCleanupTables(league: League): Unit } diff --git a/sql/init_scripts/15-upload-history.sql b/sql/init_scripts/15-upload-history.sql new file mode 100644 index 00000000..5fadadb3 --- /dev/null +++ b/sql/init_scripts/15-upload-history.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS hattrick.upload_history ( + `league_id` UInt16, + `season` UInt8, + `round` UInt8, + `is_league_match` UInt8, + `time` DateTime DEFAULT now() +) +ENGINE = MergeTree() +ORDER BY (season, round, league_id) \ No newline at end of file