Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Scalac complain more #478

Merged
merged 7 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/scala/akka/persistence/jdbc/JournalRow.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc

final case class JournalRow(
ordering: Long,
deleted: Boolean,
persistenceId: String,
sequenceNumber: Long,
message: Array[Byte],
tags: Option[String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.typesafe.config.{ Config, ConfigObject }

import scala.collection.JavaConverters._
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
override def lookup: SlickExtension.type = SlickExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
case Some(lastSeqNr) => lastSeqNr + 1
case None => from
}
Some((nextFrom, nextControl), xs)
Some(((nextFrom, nextControl), xs))
}
}

Expand All @@ -67,7 +67,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
akka.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
.mapConcat(identity)
.mapConcat(identity(_))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DefaultJournalDao(
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
.map(AkkaSerialization.fromRow(serialization))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.map(AkkaSerialization.fromRow(serialization)(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@

package akka.persistence.jdbc.journal.dao.legacy

import akka.persistence.jdbc.journal.dao.{
BaseDao,
BaseJournalDaoWithReadMessages,
FlowControl,
H2Compat,
JournalDao,
JournalDaoWithReadMessages,
JournalDaoWithUpdates
}
import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates }
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.persistence.{ AtomicWrite, PersistentRepr }
Expand Down Expand Up @@ -118,9 +110,10 @@ trait BaseByteArrayJournalDao
val write = PersistentRepr(payload, sequenceNr, persistenceId)
val serializedRow = serializer.serialize(write) match {
case Success(t) => t
case Failure(ex) =>
case Failure(cause) =>
throw new IllegalArgumentException(
s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]")
s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]",
cause)
}
db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done))
}
Expand All @@ -141,7 +134,7 @@ trait BaseByteArrayJournalDao
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.via(serializer.deserializeFlow)
.map {
case Success((repr, _, ordering)) => Success(repr -> ordering)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
val (nextMax, _, missingElems) =
// using the ordering elements that were fetched, we verify if there are any gaps
elements.foldLeft[(OrderingId, OrderingId, MissingElements)](
currentMaxOrdering,
currentMaxOrdering,
MissingElements.empty) {
(currentMaxOrdering, currentMaxOrdering, MissingElements.empty)) {
case ((currentMax, previousElement, missing), currentElement) =>
// we must decide if we move the cursor forward
val newMax =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class DefaultReadJournalDao(

// This doesn't populate the tags. AFAICT they aren't used
Source
.fromPublisher(db.stream(queries.eventsByTag(tag, offset, maxOffset, correctMaxForH2Driver(max)).result))
.fromPublisher(db.stream(queries.eventsByTag((tag, offset, maxOffset, correctMaxForH2Driver(max))).result))
.map(row =>
AkkaSerialization.fromRow(serialization)(row).map { case (repr, ordering) => (repr, Set.empty, ordering) })
}

override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result))
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def maxJournalSequence(): Future[Long] =
db.run(queries.maxJournalSequenceQuery.result)
Expand All @@ -59,7 +59,7 @@ class DefaultReadJournalDao(
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
.map(AkkaSerialization.fromRow(serialization))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.map(AkkaSerialization.fromRow(serialization)(_))

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

package akka.persistence.jdbc.query.dao

import akka.persistence.jdbc.config.{
EventJournalTableConfiguration,
EventTagTableConfiguration,
LegacyJournalTableConfiguration,
ReadJournalConfig
}
import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration, ReadJournalConfig }
import akka.persistence.jdbc.journal.dao.JournalTables
import slick.jdbc.JdbcProfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = {

val publisher = db.stream(queries.eventsByTag(s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max)).result)
val publisher = db.stream(queries.eventsByTag((s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max))).result)
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
Source
.fromPublisher(publisher)
Expand All @@ -57,7 +57,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
Source
.fromPublisher(
db.stream(
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result))
queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result))
.via(serializer.deserializeFlow)
.map {
case Success((repr, _, ordering)) => Success(repr -> ordering)
Expand All @@ -66,7 +66,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
}

override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] =
Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result))
Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result))

override def maxJournalSequence(): Future[Long] = {
db.run(queries.maxJournalSequenceQuery.result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package akka.persistence.jdbc.query.dao.legacy

import akka.persistence.jdbc.config.{ LegacyJournalTableConfiguration, ReadJournalConfig }
import akka.persistence.jdbc.journal.dao.legacy.{ JournalRow, JournalTables }
import akka.persistence.jdbc.journal.dao.legacy.JournalTables
import slick.jdbc.JdbcProfile

class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJournalConfig) extends JournalTables {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJo
import akka.persistence.query.{ EventEnvelope, Offset }
import akka.persistence.query.javadsl._
import akka.stream.javadsl.Source
import akka.persistence.jdbc.util.PluginVersionChecker

object JdbcReadJournal {
final val Identifier = ScalaJdbcReadJournal.Identifier
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/akka/persistence/jdbc/query/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

package akka.persistence.jdbc

import akka.NotUsed
import akka.persistence.query._
import akka.stream.scaladsl.Source
import scala.language.implicitConversions

package object query {
implicit class OffsetOps(val that: Offset) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E
// Continue querying from the largest offset
xs.map(_.offset.value).max
}
Some((nextStartingOffset, nextControl), xs)
Some(((nextStartingOffset, nextControl), xs))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.collection.immutable._

import scala.util.Try

@deprecated(since = "5.0.0")
@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0")
trait PersistentReprSerializer[T] {

/**
Expand Down Expand Up @@ -45,6 +45,7 @@ trait PersistentReprSerializer[T] {
def deserialize(t: T): Try[(PersistentRepr, Set[String], Long)]
}

@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0")
trait FlowPersistentReprSerializer[T] extends PersistentReprSerializer[T] {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,48 +78,51 @@ class DefaultSnapshotDao(
override def snapshotForMaxTimestamp(
persistenceId: String,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result).map(zeroOrOneSnapshot)
db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result).map(zeroOrOneSnapshot)

override def snapshotForMaxSequenceNr(
persistenceId: String,
maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result).map(zeroOrOneSnapshot)
db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result)
.map(zeroOrOneSnapshot)

override def snapshotForMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
maxSequenceNr: Long,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
db.run(
queries
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.result)
.map(zeroOrOneSnapshot)
.map(zeroOrOneSnapshot(_))

override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot))
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic)
}

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete)
db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete)
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
.map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete)
db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
.map(_ => ())((ExecutionContexts.parasitic))

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
maxSequenceNr: Long,
maxTimestamp: Long): Future[Unit] =
db.run(
queries.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp).delete)
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
.map(_ => ())((ExecutionContexts.parasitic))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.persistence.jdbc.snapshot.dao

import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration }
import akka.persistence.jdbc.config.SnapshotTableConfiguration
import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow
import slick.jdbc.JdbcProfile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package akka.persistence.jdbc.snapshot.dao

import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration }
import akka.persistence.jdbc.config.SnapshotTableConfiguration
import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow
import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.isOracleDriver
import akka.persistence.jdbc.util.InputStreamOps.InputStreamImplicits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class ByteArraySnapshotDao(
persistenceId: String,
maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] =
for {
rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result)
rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result)
} yield rows.headOption.map(toSnapshotData)

override def snapshotForMaxSequenceNr(
persistenceId: String,
maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] =
for {
rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result)
rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result)
} yield rows.headOption.map(toSnapshotData)

override def snapshotForMaxSequenceNrAndMaxTimestamp(
Expand All @@ -60,7 +60,7 @@ class ByteArraySnapshotDao(
for {
rows <- db.run(
queries
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.result)
} yield rows.headOption.map(toSnapshotData)

Expand All @@ -71,7 +71,7 @@ class ByteArraySnapshotDao(

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete)
_ <- db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
} yield ()

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
Expand All @@ -81,12 +81,12 @@ class ByteArraySnapshotDao(

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete)
_ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
} yield ()

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
for {
_ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete)
_ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
} yield ()

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
Expand All @@ -96,7 +96,7 @@ class ByteArraySnapshotDao(
for {
_ <- db.run(
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp)
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
} yield ()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.sql.Statement

import scala.concurrent.Future
import akka.Done
import akka.actor.{ ActorSystem, ClassicActorSystemProvider }
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.dispatch.Dispatchers
import akka.persistence.jdbc.db.SlickDatabase
Expand Down
5 changes: 5 additions & 0 deletions core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc;

import akka.Done;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc

import akka.{ Done, NotUsed }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.actor.ActorSystem
import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig, SlickConfiguration }
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal
import akka.persistence.jdbc.util.DropCreate
import akka.persistence.jdbc.db.{ SlickDatabase, SlickDriver }
import akka.persistence.jdbc.db.SlickDatabase
import akka.util.Timeout
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
import org.scalatest.BeforeAndAfterEach
Expand Down
Loading