From 95d66ce9cdc498293ae2c90638bde7b6ea7dd2cd Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 8 Jan 2021 13:54:24 +0100 Subject: [PATCH 1/7] Make Scalac complain more --- .../akka/persistence/jdbc/JournalRow.scala | 9 ++ .../persistence/jdbc/db/SlickExtension.scala | 1 - .../dao/BaseJournalDaoWithReadMessages.scala | 4 +- .../jdbc/journal/dao/DefaultJournalDao.scala | 4 +- .../dao/legacy/ByteArrayJournalDao.scala | 7 +- .../jdbc/query/JournalSequenceActor.scala | 4 +- .../query/dao/DefaultReadJournalDao.scala | 8 +- .../jdbc/query/dao/ReadJournalQueries.scala | 1 - .../dao/legacy/ByteArrayReadJournalDao.scala | 6 +- .../query/dao/legacy/ReadJournalQueries.scala | 2 +- .../jdbc/query/javadsl/JdbcReadJournal.scala | 1 - .../akka/persistence/jdbc/query/package.scala | 3 - .../jdbc/query/scaladsl/JdbcReadJournal.scala | 2 +- .../PersistentReprSerializer.scala | 3 +- .../snapshot/dao/DefaultSnapshotDao.scala | 16 ++-- .../jdbc/snapshot/dao/SnapshotQueries.scala | 2 +- .../jdbc/snapshot/dao/SnapshotTables.scala | 2 +- .../dao/legacy/ByteArraySnapshotDao.scala | 14 +-- .../testkit/internal/SchemaUtilsImpl.scala | 2 +- .../jdbc/SingleActorSystemPerTestSpec.scala | 2 +- .../dao/ByteArrayJournalSerializerTest.scala | 1 - .../CurrentEventsByPersistenceIdTest.scala | 30 +++--- .../jdbc/query/EventAdapterTest.scala | 24 ++--- .../query/EventsByPersistenceIdTest.scala | 8 +- .../jdbc/query/EventsByTagTest.scala | 94 +++++++++---------- .../jdbc/query/HardDeleteQueryTest.scala | 4 +- .../JournalDaoStreamMessagesMemoryTest.scala | 2 - .../jdbc/query/JournalSequenceActorTest.scala | 2 +- .../jdbc/query/QueryTestSpec.scala | 8 +- .../StoreOnlySerializableMessagesTest.scala | 10 +- project/ProjectAutoPlugin.scala | 36 +++++-- 31 files changed, 165 insertions(+), 147 deletions(-) create mode 100644 core/src/main/scala/akka/persistence/jdbc/JournalRow.scala diff --git a/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala new file mode 100644 index 000000000..d79d18369 --- /dev/null +++ b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala @@ -0,0 +1,9 @@ +package akka.persistence.jdbc + +final case class JournalRow( + ordering: Long, + deleted: Boolean, + persistenceId: String, + sequenceNumber: Long, + message: Array[Byte], + tags: Option[String] = None) diff --git a/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala b/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala index 274d4eaec..00298d781 100644 --- a/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala +++ b/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala @@ -12,7 +12,6 @@ import com.typesafe.config.{ Config, ConfigObject } import scala.collection.JavaConverters._ import scala.util.{ Failure, Success } -import scala.util.control.NonFatal object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider { override def lookup: SlickExtension.type = SlickExtension diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index 175f82aaa..fca97727f 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -55,7 +55,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { case Some(lastSeqNr) => lastSeqNr + 1 case None => from } - Some((nextFrom, nextControl), xs) + Some(((nextFrom, nextControl), xs)) } } @@ -67,7 +67,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { akka.pattern.after(delay, scheduler)(retrieveNextBatch()) } } - .mapConcat(identity) + .mapConcat(identity(_)) } } diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index 1663496e8..d6fc9f369 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -119,7 +119,7 @@ class DefaultJournalDao( Source .fromPublisher( db.stream( - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result)) - .map(AkkaSerialization.fromRow(serialization)) + queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result)) + .map(AkkaSerialization.fromRow(serialization)(_)) } } diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 98511002d..7a9725c97 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -8,10 +8,7 @@ package akka.persistence.jdbc.journal.dao.legacy import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, - FlowControl, H2Compat, - JournalDao, - JournalDaoWithReadMessages, JournalDaoWithUpdates } import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig } @@ -118,7 +115,7 @@ trait BaseByteArrayJournalDao val write = PersistentRepr(payload, sequenceNr, persistenceId) val serializedRow = serializer.serialize(write) match { case Success(t) => t - case Failure(ex) => + case Failure(_) => throw new IllegalArgumentException( s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]") } @@ -141,7 +138,7 @@ trait BaseByteArrayJournalDao Source .fromPublisher( db.stream( - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result)) + queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result)) .via(serializer.deserializeFlow) .map { case Success((repr, _, ordering)) => Success(repr -> ordering) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 2a22a987a..8eac35bff 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -140,10 +140,10 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen val (nextMax, _, missingElems) = // using the ordering elements that were fetched, we verify if there are any gaps - elements.foldLeft[(OrderingId, OrderingId, MissingElements)]( + elements.foldLeft[(OrderingId, OrderingId, MissingElements)](( currentMaxOrdering, currentMaxOrdering, - MissingElements.empty) { + MissingElements.empty)) { case ((currentMax, previousElement, missing), currentElement) => // we must decide if we move the cursor forward val newMax = diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala index f32642d5e..26fbeb797 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/DefaultReadJournalDao.scala @@ -40,13 +40,13 @@ class DefaultReadJournalDao( // This doesn't populate the tags. AFAICT they aren't used Source - .fromPublisher(db.stream(queries.eventsByTag(tag, offset, maxOffset, correctMaxForH2Driver(max)).result)) + .fromPublisher(db.stream(queries.eventsByTag((tag, offset, maxOffset, correctMaxForH2Driver(max))).result)) .map(row => AkkaSerialization.fromRow(serialization)(row).map { case (repr, ordering) => (repr, Set.empty, ordering) }) } override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] = - Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result)) + Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result)) override def maxJournalSequence(): Future[Long] = db.run(queries.maxJournalSequenceQuery.result) @@ -59,7 +59,7 @@ class DefaultReadJournalDao( Source .fromPublisher( db.stream( - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result)) - .map(AkkaSerialization.fromRow(serialization)) + queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result)) + .map(AkkaSerialization.fromRow(serialization)(_)) } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala index 1dac5f546..7ada8566e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -8,7 +8,6 @@ package akka.persistence.jdbc.query.dao import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration, - LegacyJournalTableConfiguration, ReadJournalConfig } import akka.persistence.jdbc.journal.dao.JournalTables diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala index caf6c6190..05c40ec03 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ByteArrayReadJournalDao.scala @@ -41,7 +41,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith maxOffset: Long, max: Long): Source[Try[(PersistentRepr, Set[String], Long)], NotUsed] = { - val publisher = db.stream(queries.eventsByTag(s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max)).result) + val publisher = db.stream(queries.eventsByTag((s"%$tag%", offset, maxOffset, correctMaxForH2Driver(max))).result) // applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168 Source .fromPublisher(publisher) @@ -57,7 +57,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith Source .fromPublisher( db.stream( - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max)).result)) + queries.messagesQuery((persistenceId, fromSequenceNr, toSequenceNr, correctMaxForH2Driver(max))).result)) .via(serializer.deserializeFlow) .map { case Success((repr, _, ordering)) => Success(repr -> ordering) @@ -66,7 +66,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith } override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] = - Source.fromPublisher(db.stream(queries.journalSequenceQuery(offset, limit).result)) + Source.fromPublisher(db.stream(queries.journalSequenceQuery((offset, limit)).result)) override def maxJournalSequence(): Future[Long] = { db.run(queries.maxJournalSequenceQuery.result) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala index 951b46548..1471bd75d 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/legacy/ReadJournalQueries.scala @@ -6,7 +6,7 @@ package akka.persistence.jdbc.query.dao.legacy import akka.persistence.jdbc.config.{ LegacyJournalTableConfiguration, ReadJournalConfig } -import akka.persistence.jdbc.journal.dao.legacy.{ JournalRow, JournalTables } +import akka.persistence.jdbc.journal.dao.legacy.JournalTables import slick.jdbc.JdbcProfile class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJournalConfig) extends JournalTables { diff --git a/core/src/main/scala/akka/persistence/jdbc/query/javadsl/JdbcReadJournal.scala b/core/src/main/scala/akka/persistence/jdbc/query/javadsl/JdbcReadJournal.scala index 6f92eeb8a..f266b3609 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/javadsl/JdbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/javadsl/JdbcReadJournal.scala @@ -10,7 +10,6 @@ import akka.persistence.jdbc.query.scaladsl.{ JdbcReadJournal => ScalaJdbcReadJo import akka.persistence.query.{ EventEnvelope, Offset } import akka.persistence.query.javadsl._ import akka.stream.javadsl.Source -import akka.persistence.jdbc.util.PluginVersionChecker object JdbcReadJournal { final val Identifier = ScalaJdbcReadJournal.Identifier diff --git a/core/src/main/scala/akka/persistence/jdbc/query/package.scala b/core/src/main/scala/akka/persistence/jdbc/query/package.scala index 3684ba8c3..6579da082 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/package.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/package.scala @@ -5,10 +5,7 @@ package akka.persistence.jdbc -import akka.NotUsed import akka.persistence.query._ -import akka.stream.scaladsl.Source -import scala.language.implicitConversions package object query { implicit class OffsetOps(val that: Offset) extends AnyVal { diff --git a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala index 0a9542bcc..c9d09bf35 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala @@ -261,7 +261,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E // Continue querying from the largest offset xs.map(_.offset.value).max } - Some((nextStartingOffset, nextControl), xs) + Some(((nextStartingOffset, nextControl), xs)) } } diff --git a/core/src/main/scala/akka/persistence/jdbc/serialization/PersistentReprSerializer.scala b/core/src/main/scala/akka/persistence/jdbc/serialization/PersistentReprSerializer.scala index c4ccede3b..778bac2e2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/serialization/PersistentReprSerializer.scala +++ b/core/src/main/scala/akka/persistence/jdbc/serialization/PersistentReprSerializer.scala @@ -14,7 +14,7 @@ import scala.collection.immutable._ import scala.util.Try -@deprecated(since = "5.0.0") +@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0") trait PersistentReprSerializer[T] { /** @@ -45,6 +45,7 @@ trait PersistentReprSerializer[T] { def deserialize(t: T): Try[(PersistentRepr, Set[String], Long)] } +@deprecated("use Akka Serialization for the payloads instead", since = "5.0.0") trait FlowPersistentReprSerializer[T] extends PersistentReprSerializer[T] { /** diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala index bef9574d0..a190a0d32 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala @@ -78,12 +78,12 @@ class DefaultSnapshotDao( override def snapshotForMaxTimestamp( persistenceId: String, maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] = - db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result).map(zeroOrOneSnapshot) + db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result).map(zeroOrOneSnapshot) override def snapshotForMaxSequenceNr( persistenceId: String, maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] = - db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result).map(zeroOrOneSnapshot) + db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result).map(zeroOrOneSnapshot) override def snapshotForMaxSequenceNrAndMaxTimestamp( persistenceId: String, @@ -91,9 +91,9 @@ class DefaultSnapshotDao( maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] = db.run( queries - .selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp) + .selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)) .result) - .map(zeroOrOneSnapshot) + .map(zeroOrOneSnapshot(_)) override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot)) @@ -101,18 +101,18 @@ class DefaultSnapshotDao( } override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] = - db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete) + db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete) .map(_ => ())(ExecutionContexts.parasitic) override def deleteAllSnapshots(persistenceId: String): Future[Unit] = db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic)) override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] = - db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete) + db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete) .map(_ => ())((ExecutionContexts.parasitic)) override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] = - db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete) + db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete) .map(_ => ())((ExecutionContexts.parasitic)) override def deleteUpToMaxSequenceNrAndMaxTimestamp( @@ -120,6 +120,6 @@ class DefaultSnapshotDao( maxSequenceNr: Long, maxTimestamp: Long): Future[Unit] = db.run( - queries.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp).delete) + queries.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)).delete) .map(_ => ())((ExecutionContexts.parasitic)) } diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotQueries.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotQueries.scala index efcafa2bd..8dffdbef9 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotQueries.scala @@ -5,7 +5,7 @@ package akka.persistence.jdbc.snapshot.dao -import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration } +import akka.persistence.jdbc.config.SnapshotTableConfiguration import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow import slick.jdbc.JdbcProfile diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala index c056b539e..4a17841d4 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/SnapshotTables.scala @@ -5,7 +5,7 @@ package akka.persistence.jdbc.snapshot.dao -import akka.persistence.jdbc.config.{ LegacySnapshotTableConfiguration, SnapshotTableConfiguration } +import akka.persistence.jdbc.config.SnapshotTableConfiguration import akka.persistence.jdbc.snapshot.dao.SnapshotTables.SnapshotRow import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.isOracleDriver import akka.persistence.jdbc.util.InputStreamOps.InputStreamImplicits diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/ByteArraySnapshotDao.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/ByteArraySnapshotDao.scala index baad8b202..7e4092365 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/ByteArraySnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/legacy/ByteArraySnapshotDao.scala @@ -43,14 +43,14 @@ class ByteArraySnapshotDao( persistenceId: String, maxTimestamp: Long): Future[Option[(SnapshotMetadata, Any)]] = for { - rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp(persistenceId, maxTimestamp).result) + rows <- db.run(queries.selectOneByPersistenceIdAndMaxTimestamp((persistenceId, maxTimestamp)).result) } yield rows.headOption.map(toSnapshotData) override def snapshotForMaxSequenceNr( persistenceId: String, maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] = for { - rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr(persistenceId, maxSequenceNr).result) + rows <- db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result) } yield rows.headOption.map(toSnapshotData) override def snapshotForMaxSequenceNrAndMaxTimestamp( @@ -60,7 +60,7 @@ class ByteArraySnapshotDao( for { rows <- db.run( queries - .selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp) + .selectOneByPersistenceIdAndMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)) .result) } yield rows.headOption.map(toSnapshotData) @@ -71,7 +71,7 @@ class ByteArraySnapshotDao( override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] = for { - _ <- db.run(queries.selectByPersistenceIdAndSequenceNr(persistenceId, sequenceNr).delete) + _ <- db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete) } yield () override def deleteAllSnapshots(persistenceId: String): Future[Unit] = @@ -81,12 +81,12 @@ class ByteArraySnapshotDao( override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] = for { - _ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr(persistenceId, maxSequenceNr).delete) + _ <- db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete) } yield () override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] = for { - _ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp(persistenceId, maxTimestamp).delete) + _ <- db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete) } yield () override def deleteUpToMaxSequenceNrAndMaxTimestamp( @@ -96,7 +96,7 @@ class ByteArraySnapshotDao( for { _ <- db.run( queries - .selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp(persistenceId, maxSequenceNr, maxTimestamp) + .selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)) .delete) } yield () } diff --git a/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala b/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala index 59ab5c342..521725a55 100644 --- a/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala +++ b/core/src/main/scala/akka/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala @@ -9,7 +9,7 @@ import java.sql.Statement import scala.concurrent.Future import akka.Done -import akka.actor.{ ActorSystem, ClassicActorSystemProvider } +import akka.actor.ClassicActorSystemProvider import akka.annotation.InternalApi import akka.dispatch.Dispatchers import akka.persistence.jdbc.db.SlickDatabase diff --git a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala index f882117a3..749fcb75b 100644 --- a/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/SingleActorSystemPerTestSpec.scala @@ -9,7 +9,7 @@ import akka.actor.ActorSystem import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig, SlickConfiguration } import akka.persistence.jdbc.query.javadsl.JdbcReadJournal import akka.persistence.jdbc.util.DropCreate -import akka.persistence.jdbc.db.{ SlickDatabase, SlickDriver } +import akka.persistence.jdbc.db.SlickDatabase import akka.util.Timeout import com.typesafe.config.{ Config, ConfigFactory, ConfigValue } import org.scalatest.BeforeAndAfterEach diff --git a/core/src/test/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalSerializerTest.scala b/core/src/test/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalSerializerTest.scala index 38398a35c..55bb47a8d 100644 --- a/core/src/test/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalSerializerTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalSerializerTest.scala @@ -7,7 +7,6 @@ package akka.persistence.jdbc package journal.dao.legacy import akka.persistence.{ AtomicWrite, PersistentRepr } -import akka.serialization.SerializationExtension import scala.collection.immutable._ diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala index 12f022938..d3b078e52 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala @@ -17,7 +17,7 @@ abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTes it should "find events from sequenceNr" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => actor1 ! 1 actor1 ! 2 actor1 ! 3 @@ -35,49 +35,49 @@ abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTes journalOps.withCurrentEventsByPersistenceId()("my-1", 1, 1) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 1, 2) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 2) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 3) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 3, 3) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 0, 3) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) tp.expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 1, 3) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) tp.expectComplete() } } @@ -141,7 +141,7 @@ abstract class CurrentEventsByPersistenceIdTest(config: String) extends QueryTes it should "find events for actors" in withActorSystem { implicit system => val journalOps = new JavaDslJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => actor1 ! 1 actor1 ! 2 actor1 ! 3 diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala index 87d922a63..061c5c364 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala @@ -56,17 +56,17 @@ abstract class EventAdapterTest(config: String) extends QueryTestSpec(config) { it should "apply event adapter when querying events for actor with pid 'my-1'" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => journalOps.withEventsByPersistenceId()("my-1", 0) { tp => tp.request(10) tp.expectNoMessage(100.millis) actor1 ! Event("1") - tp.expectNext(ExpectNextTimeout, EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"))) + tp.expectNext(ExpectNextTimeout, EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"), timestamp = 0L)) tp.expectNoMessage(100.millis) actor1 ! Event("2") - tp.expectNext(ExpectNextTimeout, EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"))) + tp.expectNext(ExpectNextTimeout, EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"), timestamp = 0L)) tp.expectNoMessage(100.millis) tp.cancel() } @@ -86,12 +86,12 @@ abstract class EventAdapterTest(config: String) extends QueryTestSpec(config) { journalOps.withEventsByTag(10.seconds)("event", Sequence(1)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, EventRestored("2"))) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, EventRestored("3"))) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, EventRestored("2"), timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, EventRestored("3"), timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor1 ? TaggedEvent(Event("1"), "event") - tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, EventRestored("1"))) + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, EventRestored("1"), timestamp = 0L)) tp.cancel() tp.expectNoMessage(NoMsgTime) } @@ -100,7 +100,7 @@ abstract class EventAdapterTest(config: String) extends QueryTestSpec(config) { it should "apply event adapters when querying current events for actors" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => actor1 ! Event("1") actor1 ! Event("2") actor1 ! Event("3") @@ -110,21 +110,21 @@ abstract class EventAdapterTest(config: String) extends QueryTestSpec(config) { } journalOps.withCurrentEventsByPersistenceId()("my-1", 1, 1) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"))).expectComplete() + tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"), timestamp = 0L)).expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 2) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"))).expectComplete() + tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"), timestamp = 0L)).expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 3, 3) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"))).expectComplete() + tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"), timestamp = 0L)).expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 3) { tp => tp.request(Int.MaxValue) - .expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"))) - .expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"))) + .expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"), timestamp = 0L)) + .expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"), timestamp = 0L)) .expectComplete() } } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByPersistenceIdTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByPersistenceIdTest.scala index 29423a318..735c4a58c 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventsByPersistenceIdTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByPersistenceIdTest.scala @@ -29,7 +29,7 @@ abstract class EventsByPersistenceIdTest(config: String) extends QueryTestSpec(c it should "find events from sequenceNr" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => actor1 ! withTags(1, "number") actor1 ! withTags(2, "number") actor1 ! withTags(3, "number") @@ -216,7 +216,7 @@ abstract class EventsByPersistenceIdTest(config: String) extends QueryTestSpec(c it should "find events for actor with pid 'my-1'" in withActorSystem { implicit system => val journalOps = new ScalaJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, _, _) => journalOps.withEventsByPersistenceId()("my-1", 0) { tp => tp.request(10) tp.expectNoMessage(100.millis) @@ -236,7 +236,7 @@ abstract class EventsByPersistenceIdTest(config: String) extends QueryTestSpec(c it should "find events for actor with pid 'my-1' and persisting messages to other actor" in withActorSystem { implicit system => val journalOps = new JavaDslJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (actor1, actor2, _) => journalOps.withEventsByPersistenceId()("my-1", 0, Long.MaxValue) { tp => tp.request(10) tp.expectNoMessage(100.millis) @@ -266,7 +266,7 @@ abstract class EventsByPersistenceIdTest(config: String) extends QueryTestSpec(c it should "find events for actor with pid 'my-2'" in withActorSystem { implicit system => val journalOps = new JavaDslJdbcReadJournalOperations(system) - withTestActors() { (actor1, actor2, actor3) => + withTestActors() { (_, actor2, _) => actor2 ! 1 actor2 ! 2 actor2 ! 3 diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagTest.scala index 279fe55b8..1655a48fc 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagTest.scala @@ -56,38 +56,38 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } journalOps.withEventsByTag()("number", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } journalOps.withEventsByTag()("number", Sequence(0)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } journalOps.withEventsByTag()("number", Sequence(1)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } journalOps.withEventsByTag()("number", Sequence(2)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } @@ -100,19 +100,19 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag()("number", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor1 ? withTags(1, "number") - tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1)) + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1, timestamp = 0L)) actor1 ? withTags(1, "number") - tp.expectNext(EventEnvelope(Sequence(5), "my-1", 3, 1)) + tp.expectNext(EventEnvelope(Sequence(5), "my-1", 3, 1, timestamp = 0L)) actor1 ? withTags(1, "number") - tp.expectNext(EventEnvelope(Sequence(6), "my-1", 4, 1)) + tp.expectNext(EventEnvelope(Sequence(6), "my-1", 4, 1, timestamp = 0L)) tp.cancel() tp.expectNoMessage(NoMsgTime) } @@ -166,29 +166,29 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.cancel() } journalOps.withEventsByTag()("sharded-1", Sequence(Long.MinValue)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } journalOps.withEventsByTag()("sharded-10", Sequence(Long.MinValue)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } journalOps.withEventsByTag()("sharded-100", Sequence(Long.MinValue)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } @@ -238,12 +238,12 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag()("number", Sequence(1)) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor1 ? withTags(1, "number") - tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1)) + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1, timestamp = 0L)) tp.cancel() tp.expectNoMessage(NoMsgTime) } @@ -258,15 +258,15 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con tp.expectNoMessage(NoMsgTime) actor1 ! withTags(1, "one") // 1 - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor2 ! withTags(1, "one") // 2 - tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor3 ! withTags(1, "one") // 3 - tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor1 ! withTags(2, "two") // 4 @@ -279,15 +279,15 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con tp.expectNoMessage(NoMsgTime) actor1 ! withTags(1, "one") // 7 - tp.expectNext(EventEnvelope(Sequence(7), "my-1", 3, 1)) + tp.expectNext(EventEnvelope(Sequence(7), "my-1", 3, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor2 ! withTags(1, "one") // 8 - tp.expectNext(EventEnvelope(Sequence(8), "my-2", 3, 1)) + tp.expectNext(EventEnvelope(Sequence(8), "my-2", 3, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) actor3 ! withTags(1, "one") // 9 - tp.expectNext(EventEnvelope(Sequence(9), "my-3", 3, 1)) + tp.expectNext(EventEnvelope(Sequence(9), "my-3", 3, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() tp.expectNoMessage(NoMsgTime) @@ -318,37 +318,37 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag(10.seconds)("prime", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) - tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2)) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) - tp.expectNext(EventEnvelope(Sequence(5), "my-1", 5, 5)) - tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3)) - tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-1", 2, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(5), "my-1", 5, 5, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } journalOps.withEventsByTag(10.seconds)("three", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) - tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3)) - tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } journalOps.withEventsByTag(10.seconds)("3", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3)) - tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3)) - tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3)) + tp.expectNext(EventEnvelope(Sequence(3), "my-1", 3, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(6), "my-2", 1, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(7), "my-3", 1, 3, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } journalOps.withEventsByTag(10.seconds)("one", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1)) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() } @@ -362,7 +362,7 @@ abstract class EventsByTagTest(config: String) extends QueryTestSpec(config, con journalOps.withEventsByTag(10.seconds)("five", NoOffset) { tp => tp.request(Int.MaxValue) - tp.expectNext(EventEnvelope(Sequence(5), "my-1", 5, 5)) + tp.expectNext(EventEnvelope(Sequence(5), "my-1", 5, 5, timestamp = 0L)) tp.expectNoMessage(NoMsgTime) tp.cancel() tp.expectNoMessage(NoMsgTime) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala index 0ae76e608..893215cd9 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/HardDeleteQueryTest.scala @@ -5,10 +5,8 @@ package akka.persistence.jdbc.query -import akka.persistence.query.{ EventEnvelope, NoOffset, Sequence } +import akka.persistence.query.NoOffset import akka.pattern._ -import com.typesafe.config.ConfigFactory -import org.scalactic.source.Position import scala.concurrent.duration._ import org.scalatest.matchers.should.Matchers diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index b1a7ebfcd..efc6643c4 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -43,8 +43,6 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration val journalTableCfg = journalConfig.journalTableConfiguration - import profile.api._ - implicit val askTimeout = 50.millis def generateId: Int = 0 diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala index 59eb9f7b0..749eb07e8 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalSequenceActorTest.scala @@ -184,7 +184,7 @@ class MockDaoJournalSequenceActorTest extends SharedActorSystemTestSpec { val almostQueryDelay = queryDelay - 50.millis val almostImmediately = 50.millis - withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, actor) => + withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, _) => daoProbe.expectMsg(almostImmediately, TestProbeReadJournalDao.JournalSequence(0, batchSize)) val firstBatch = (1L to 40L) ++ (51L to 110L) daoProbe.reply(firstBatch) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/QueryTestSpec.scala b/core/src/test/scala/akka/persistence/jdbc/query/QueryTestSpec.scala index 786f5bffc..5e6e77515 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/QueryTestSpec.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/QueryTestSpec.scala @@ -20,7 +20,6 @@ import akka.stream.testkit.javadsl.{ TestSink => JavaSink } import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ Materializer, SystemMaterializer } import com.typesafe.config.ConfigValue -import slick.jdbc.PostgresProfile.api._ import scala.concurrent.Future import scala.concurrent.duration.{ FiniteDuration, _ } @@ -28,7 +27,6 @@ import akka.persistence.jdbc.testkit.internal.H2 import akka.persistence.jdbc.testkit.internal.MySQL import akka.persistence.jdbc.testkit.internal.Oracle import akka.persistence.jdbc.testkit.internal.Postgres -import akka.persistence.jdbc.testkit.internal.SchemaUtilsImpl import akka.persistence.jdbc.testkit.internal.SqlServer trait ReadJournalOperations { @@ -242,7 +240,7 @@ abstract class QueryTestSpec(config: String, configOverrides: Map[String, Config } case event @ Tagged(payload: Int, tags) => - persist(event) { (event: Tagged) => + persist(event) { _ => updateState(payload) if (replyToMessages) sender() ! akka.actor.Status.Success((payload, tags)) } @@ -252,11 +250,11 @@ abstract class QueryTestSpec(config: String, configOverrides: Map[String, Config } case event @ TaggedEvent(payload: Event, tag) => - persist(event) { evt => + persist(event) { _ => if (replyToMessages) sender() ! akka.actor.Status.Success((payload, tag)) } case event @ TaggedAsyncEvent(payload: Event, tag) => - persistAsync(event) { evt => + persistAsync(event) { _ => if (replyToMessages) sender() ! akka.actor.Status.Success((payload, tag)) } } diff --git a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala index 736411e7c..9d79a54bc 100644 --- a/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/serialization/StoreOnlySerializableMessagesTest.scala @@ -72,7 +72,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch } // the recover cycle - withActor("1") { actor => recover => failure => rejected => + withActor("1") { _ => recover => failure => rejected => recover.expectMsg("foo") recover.expectMsg(RecoveryCompleted) failure.expectNoMessage(100.millis) @@ -82,7 +82,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch it should "not persist a single non-serializable message" in { class NotSerializable - withActor("2") { actor => recover => failure => rejected => + withActor("2") { actor => recover => _ => rejected => val tp = TestProbe() recover.expectMsg(RecoveryCompleted) tp.send(actor, new NotSerializable) // the NotSerializable class cannot be serialized @@ -94,7 +94,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch } // the recover cycle, no message should be recovered - withActor("2") { actor => recover => failure => rejected => + withActor("2") { _ => recover => _ => _ => recover.expectMsg(RecoveryCompleted) recover.expectNoMessage(100.millis) } @@ -102,7 +102,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch it should "persist only serializable messages" in { class NotSerializable - withActor("3") { actor => recover => failure => rejected => + withActor("3") { actor => recover => _ => rejected => val tp = TestProbe() recover.expectMsg(RecoveryCompleted) tp.send(actor, "foo") @@ -117,7 +117,7 @@ abstract class StoreOnlySerializableMessagesTest(config: String, schemaType: Sch } // recover cycle - withActor("3") { actor => recover => failure => rejected => + withActor("3") { _ => recover => failure => rejected => recover.expectMsg("foo") recover.expectMsg(RecoveryCompleted) failure.expectNoMessage(100.millis) diff --git a/project/ProjectAutoPlugin.scala b/project/ProjectAutoPlugin.scala index d17a1bd6b..26804b1ff 100644 --- a/project/ProjectAutoPlugin.scala +++ b/project/ProjectAutoPlugin.scala @@ -40,17 +40,25 @@ object ProjectAutoPlugin extends AutoPlugin { scalacOptions ++= Seq( "-encoding", "UTF-8", - "-deprecation", - "-feature", "-unchecked", "-Xlog-reflective-calls", "-language:higherKinds", "-language:implicitConversions", "-target:jvm-1.8"), - scalacOptions += { - if (scalaVersion.value.startsWith("2.13")) "" - else "-Ypartial-unification" - }, + Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, 13)) => + disciplineScalacOptions -- Set( + "-Ywarn-inaccessible", + "-Ywarn-infer-any", + "-Ywarn-nullary-override", + "-Ywarn-nullary-unit", + "-Ypartial-unification", + "-Yno-adapted-args") + case Some((2, 12)) => + disciplineScalacOptions + case _ => + Nil + }).toSeq, scalacOptions += "-Ydelambdafy:method", Compile / doc / scalacOptions := scalacOptions.value ++ Seq( "-doc-title", @@ -74,4 +82,20 @@ object ProjectAutoPlugin extends AutoPlugin { |""".stripMargin)), resolvers += Resolver.typesafeRepo("releases"), resolvers += Resolver.jcenterRepo) + + val disciplineScalacOptions = Set( +// "-Xfatal-warnings", + "-feature", + "-Yno-adapted-args", + "-deprecation", + "-Xlint", + "-Ywarn-dead-code", + "-Ywarn-inaccessible", + "-Ywarn-infer-any", + "-Ywarn-nullary-override", + "-Ywarn-nullary-unit", + "-Ywarn-unused:_", + "-Ypartial-unification", + "-Ywarn-extra-implicit") + } From d8dc5ae2c8380717dc93f004dce87469a1227024 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:54:35 +0100 Subject: [PATCH 2/7] file header --- core/src/main/scala/akka/persistence/jdbc/JournalRow.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala index d79d18369..00f8b3301 100644 --- a/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala +++ b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala @@ -1,3 +1,8 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2020 Lightbend Inc. + */ + package akka.persistence.jdbc final case class JournalRow( From b8c4630c7121a0af3f48f08ab8c25b18ca0d9d12 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 8 Jan 2021 14:58:57 +0100 Subject: [PATCH 3/7] scalafmtAll --- .../journal/dao/legacy/ByteArrayJournalDao.scala | 7 +------ .../jdbc/query/JournalSequenceActor.scala | 6 ++---- .../jdbc/query/dao/ReadJournalQueries.scala | 6 +----- .../jdbc/snapshot/dao/DefaultSnapshotDao.scala | 7 +++++-- .../persistence/jdbc/query/EventAdapterTest.scala | 12 +++++++++--- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 7a9725c97..bc19e2771 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -5,12 +5,7 @@ package akka.persistence.jdbc.journal.dao.legacy -import akka.persistence.jdbc.journal.dao.{ - BaseDao, - BaseJournalDaoWithReadMessages, - H2Compat, - JournalDaoWithUpdates -} +import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates } import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig } import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer import akka.persistence.{ AtomicWrite, PersistentRepr } diff --git a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala index 8eac35bff..30fdfea07 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/JournalSequenceActor.scala @@ -140,10 +140,8 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen val (nextMax, _, missingElems) = // using the ordering elements that were fetched, we verify if there are any gaps - elements.foldLeft[(OrderingId, OrderingId, MissingElements)](( - currentMaxOrdering, - currentMaxOrdering, - MissingElements.empty)) { + elements.foldLeft[(OrderingId, OrderingId, MissingElements)]( + (currentMaxOrdering, currentMaxOrdering, MissingElements.empty)) { case ((currentMax, previousElement, missing), currentElement) => // we must decide if we move the cursor forward val newMax = diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala index 7ada8566e..454bbbed7 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -5,11 +5,7 @@ package akka.persistence.jdbc.query.dao -import akka.persistence.jdbc.config.{ - EventJournalTableConfiguration, - EventTagTableConfiguration, - ReadJournalConfig -} +import akka.persistence.jdbc.config.{ EventJournalTableConfiguration, EventTagTableConfiguration, ReadJournalConfig } import akka.persistence.jdbc.journal.dao.JournalTables import slick.jdbc.JdbcProfile diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala index a190a0d32..b4100f961 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala @@ -83,7 +83,8 @@ class DefaultSnapshotDao( override def snapshotForMaxSequenceNr( persistenceId: String, maxSequenceNr: Long): Future[Option[(SnapshotMetadata, Any)]] = - db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result).map(zeroOrOneSnapshot) + db.run(queries.selectOneByPersistenceIdAndMaxSequenceNr((persistenceId, maxSequenceNr)).result) + .map(zeroOrOneSnapshot) override def snapshotForMaxSequenceNrAndMaxTimestamp( persistenceId: String, @@ -120,6 +121,8 @@ class DefaultSnapshotDao( maxSequenceNr: Long, maxTimestamp: Long): Future[Unit] = db.run( - queries.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)).delete) + queries + .selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)) + .delete) .map(_ => ())((ExecutionContexts.parasitic)) } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala index 061c5c364..6f487d892 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventAdapterTest.scala @@ -110,15 +110,21 @@ abstract class EventAdapterTest(config: String) extends QueryTestSpec(config) { } journalOps.withCurrentEventsByPersistenceId()("my-1", 1, 1) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"), timestamp = 0L)).expectComplete() + tp.request(Int.MaxValue) + .expectNext(EventEnvelope(Sequence(1), "my-1", 1, EventRestored("1"), timestamp = 0L)) + .expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 2) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"), timestamp = 0L)).expectComplete() + tp.request(Int.MaxValue) + .expectNext(EventEnvelope(Sequence(2), "my-1", 2, EventRestored("2"), timestamp = 0L)) + .expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 3, 3) { tp => - tp.request(Int.MaxValue).expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"), timestamp = 0L)).expectComplete() + tp.request(Int.MaxValue) + .expectNext(EventEnvelope(Sequence(3), "my-1", 3, EventRestored("3"), timestamp = 0L)) + .expectComplete() } journalOps.withCurrentEventsByPersistenceId()("my-1", 2, 3) { tp => From bc3c7c7d4e1d6233389965d17e94b339c4f7e2a3 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 8 Jan 2021 15:37:38 +0100 Subject: [PATCH 4/7] scalafmtSbt --- project/ProjectAutoPlugin.scala | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/project/ProjectAutoPlugin.scala b/project/ProjectAutoPlugin.scala index 26804b1ff..fb87af9dc 100644 --- a/project/ProjectAutoPlugin.scala +++ b/project/ProjectAutoPlugin.scala @@ -46,19 +46,19 @@ object ProjectAutoPlugin extends AutoPlugin { "-language:implicitConversions", "-target:jvm-1.8"), Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, 13)) => - disciplineScalacOptions -- Set( - "-Ywarn-inaccessible", - "-Ywarn-infer-any", - "-Ywarn-nullary-override", - "-Ywarn-nullary-unit", - "-Ypartial-unification", - "-Yno-adapted-args") - case Some((2, 12)) => - disciplineScalacOptions - case _ => - Nil - }).toSeq, + case Some((2, 13)) => + disciplineScalacOptions -- Set( + "-Ywarn-inaccessible", + "-Ywarn-infer-any", + "-Ywarn-nullary-override", + "-Ywarn-nullary-unit", + "-Ypartial-unification", + "-Yno-adapted-args") + case Some((2, 12)) => + disciplineScalacOptions + case _ => + Nil + }).toSeq, scalacOptions += "-Ydelambdafy:method", Compile / doc / scalacOptions := scalacOptions.value ++ Seq( "-doc-title", From edf99197bc2dd33e68b9a4fedbb7c28f7a9f0c86 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 11 Jan 2021 11:07:42 +0100 Subject: [PATCH 5/7] Keep failure cause --- .../jdbc/journal/dao/legacy/ByteArrayJournalDao.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index bc19e2771..80695ccf0 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -110,9 +110,9 @@ trait BaseByteArrayJournalDao val write = PersistentRepr(payload, sequenceNr, persistenceId) val serializedRow = serializer.serialize(write) match { case Success(t) => t - case Failure(_) => + case Failure(cause) => throw new IllegalArgumentException( - s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]") + s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]", cause) } db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done)) } From 791fd031f8f251ed7fd71113434961d9255e9b06 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 11 Jan 2021 11:19:19 +0100 Subject: [PATCH 6/7] update headers --- core/src/main/scala/akka/persistence/jdbc/JournalRow.scala | 2 +- .../src/test/java/akka/persistence/jdbc/JavadslSnippets.java | 5 +++++ .../test/scala/akka/persistence/jdbc/ScaladslSnippets.scala | 5 +++++ .../scala/akka/persistence/jdbc/migration/PostgresSpec.scala | 2 +- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala index 00f8b3301..6de391084 100644 --- a/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala +++ b/core/src/main/scala/akka/persistence/jdbc/JournalRow.scala @@ -1,6 +1,6 @@ /* * Copyright (C) 2014 - 2019 Dennis Vriend - * Copyright (C) 2019 - 2020 Lightbend Inc. + * Copyright (C) 2019 - 2021 Lightbend Inc. */ package akka.persistence.jdbc diff --git a/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java b/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java index e91af2cb4..c407eb172 100644 --- a/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java +++ b/core/src/test/java/akka/persistence/jdbc/JavadslSnippets.java @@ -1,3 +1,8 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + package akka.persistence.jdbc; import akka.Done; diff --git a/core/src/test/scala/akka/persistence/jdbc/ScaladslSnippets.scala b/core/src/test/scala/akka/persistence/jdbc/ScaladslSnippets.scala index 6b7139f07..d7a499db6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/ScaladslSnippets.scala +++ b/core/src/test/scala/akka/persistence/jdbc/ScaladslSnippets.scala @@ -1,3 +1,8 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2021 Lightbend Inc. + */ + package akka.persistence.jdbc import akka.{ Done, NotUsed } diff --git a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala b/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala index 77acf7d4d..d2a0be04a 100644 --- a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala +++ b/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala @@ -1,6 +1,6 @@ /* * Copyright (C) 2014 - 2019 Dennis Vriend - * Copyright (C) 2019 - 2020 Lightbend Inc. + * Copyright (C) 2019 - 2021 Lightbend Inc. */ package akka.persistence.jdbc.migration From 27e4036056182153debfedd641268cfe09a5e846 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 11 Jan 2021 11:22:16 +0100 Subject: [PATCH 7/7] scalafmt --- .../jdbc/journal/dao/legacy/ByteArrayJournalDao.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala index 80695ccf0..dcee79cd7 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/legacy/ByteArrayJournalDao.scala @@ -112,7 +112,8 @@ trait BaseByteArrayJournalDao case Success(t) => t case Failure(cause) => throw new IllegalArgumentException( - s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]", cause) + s"Failed to serialize ${write.getClass} for update of [$persistenceId] @ [$sequenceNr]", + cause) } db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done)) }