Skip to content

Commit

Permalink
Merge pull request #193 from MAIF/feature/play-3-pekko
Browse files Browse the repository at this point in the history
Move to play 3 / pekko
  • Loading branch information
larousso authored Oct 8, 2024
2 parents a1e5cb3 + 52f6fed commit 2234afa
Show file tree
Hide file tree
Showing 79 changed files with 514 additions and 445 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
with:
java-version: 11
- name: Start docker
run: docker-compose -f docker-compose.test.yml up -d
run: docker compose -f docker-compose.test.yml up -d
- name: Build javascript
id: buildjs
run: sh ./scripts/build.sh ui
Expand Down
2 changes: 2 additions & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
java=11-open
sbt=1.10.2
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import ReleaseTransformations._

name := """nio"""
organization := "fr.maif"
scalaVersion := "2.13.10"
scalaVersion := "2.13.11"

lazy val root = (project in file("."))
.aggregate(
Expand Down
56 changes: 47 additions & 9 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,56 @@ services:
command: mongod --port 27018
ports:
- 27018:27018
kafka:
image: spotify/kafka
ports:
- 2181:2181
- 9092:9092
environment:
ADVERTISED_HOST: 127.0.0.1
ADVERTISED_PORT: 9092
s3server:
image: scality/s3server
ports:
- 8000:8000
environment:
- "SCALITY_ACCESS_KEY_ID=newAccessKey"
- "SCALITY_SECRET_ACCESS_KEY=newSecretKey"
- "SCALITY_SECRET_ACCESS_KEY=newSecretKey"
zookeeper_test:
image: confluentinc/cp-zookeeper:5.2.3
ports:
- 32182:32182
environment:
ZOOKEEPER_CLIENT_PORT: 32182
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- "moby:127.0.0.1"
- "localhost: 127.0.0.1"
kafka_test:
image: confluentinc/cp-kafka:5.2.3
ports:
- 9092:9092
depends_on:
- zookeeper_test
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper_test:32182
KAFKA_LISTENERS: "INTERNAL://:9093,EXTERNAL://:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka_test:9093,EXTERNAL://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 1
extra_hosts:
- "moby:127.0.0.1"
- "localhost: 127.0.0.1"
akhq_test:
image: tchiotludo/akhq
ports:
- 9005:8080
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka_test:9092"
depends_on:
- kafka_test
extra_hosts:
- "moby:127.0.0.1"
- "localhost: 127.0.0.1"
2 changes: 1 addition & 1 deletion nio-provider/app/actor/EventActor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package actor

import akka.actor.{Actor, ActorRef, Props}
import org.apache.pekko.actor.{Actor, ActorRef, Props}
import models.NioEvent
import play.api.libs.json.Json
import utils.NioLogger
Expand Down
8 changes: 4 additions & 4 deletions nio-provider/app/actor/KafkaActor.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package actor

import akka.NotUsed
import akka.actor.{Actor, Props}
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.{Actor, Props}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Source
import models.NioEvent
import utils.NioLogger

Expand Down
2 changes: 1 addition & 1 deletion nio-provider/app/controllers/HomeController.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package controllers

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import auth.AuthActionWithEmail
import configuration.Env
import messaging.KafkaMessageBroker
Expand Down
8 changes: 4 additions & 4 deletions nio-provider/app/controllers/UserDataController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package controllers
import java.io.FileInputStream

import actor.EventActor
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Source, StreamConverters}
import akka.util.ByteString
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Source, StreamConverters}
import org.apache.pekko.util.ByteString
import auth.AuthActionWithEmail
import configuration.Env
import play.api.libs.streams.ActorFlow
Expand Down
2 changes: 1 addition & 1 deletion nio-provider/app/filters/OtoroshiFilter.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package filters

import akka.stream.Materializer
import org.apache.pekko.stream.Materializer
import auth.AuthInfo
import com.auth0.jwt._
import com.auth0.jwt.algorithms.Algorithm
Expand Down
2 changes: 1 addition & 1 deletion nio-provider/app/loader/NioLoader.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package loader

import actor.{EventActor, KafkaActor}
import akka.actor.{ActorRef, ActorSystem}
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import auth._
import com.softwaremill.macwire.wire
import configuration._
Expand Down
2 changes: 1 addition & 1 deletion nio-provider/app/loader/Starter.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package loader

import actor.KafkaActor
import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import messaging.KafkaMessageBroker

class Starter(kafkaMessageBroker: KafkaMessageBroker,
Expand Down
4 changes: 2 additions & 2 deletions nio-provider/app/messaging/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package messaging

import java.io.File

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import configuration.KafkaConfig
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
Expand All @@ -13,7 +13,7 @@ import org.apache.kafka.common.serialization.{

object KafkaSettings {

import akka.kafka.{ConsumerSettings, ProducerSettings}
import org.apache.pekko.kafka.{ConsumerSettings, ProducerSettings}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand Down
12 changes: 6 additions & 6 deletions nio-provider/app/messaging/KafkaMessageBroker.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package messaging

import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.Subscriptions
import akka.kafka.scaladsl.Consumer
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.kafka.Subscriptions
import org.apache.pekko.kafka.scaladsl.Consumer
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import configuration.{Env, KafkaConfig}
import models.NioEvent
import utils.NioLogger
Expand Down
30 changes: 15 additions & 15 deletions nio-provider/app/models/UserExtractTask.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package models

import org.joda.time.{DateTime, DateTimeZone}
import java.time.{LocalDateTime, Clock}
import play.api.libs.functional.syntax.{unlift, _}
import play.api.libs.json.Reads._
import play.api.libs.json.Writes._
Expand All @@ -15,16 +15,16 @@ case class UserExtractTask(
orgKey: String,
userId: String,
email: String,
startedAt: DateTime,
uploadStartedAt: Option[DateTime],
endedAt: Option[DateTime]
startedAt: LocalDateTime,
uploadStartedAt: Option[LocalDateTime],
endedAt: Option[LocalDateTime]
) {
def asJson(): JsValue =
UserExtractTask.userExtractTaskWrites.writes(this)
}

object UserExtractTask {
implicit val dateFormats: Format[DateTime] = DateUtils.utcDateTimeFormats
implicit val dateFormats: Format[LocalDateTime] = DateUtils.utcDateTimeFormats

implicit val userExtractTaskReads: Reads[UserExtractTask] = (
(__ \ "_id").readNullable[String].map { maybeId =>
Expand All @@ -34,11 +34,11 @@ object UserExtractTask {
(__ \ "orgKey").read[String] and
(__ \ "userId").read[String] and
(__ \ "email").read[String] and
(__ \ "startedAt").readNullable[DateTime].map { maybeStartedAt =>
maybeStartedAt.getOrElse(DateTime.now(DateTimeZone.UTC))
(__ \ "startedAt").readNullable[LocalDateTime].map { maybeStartedAt =>
maybeStartedAt.getOrElse(LocalDateTime.now(Clock.systemUTC))
} and
(__ \ "uploadStartedAt").readNullable[DateTime] and
(__ \ "endedAt").readNullable[DateTime]
(__ \ "uploadStartedAt").readNullable[LocalDateTime] and
(__ \ "endedAt").readNullable[LocalDateTime]
)(UserExtractTask.apply _)

implicit val userExtractTaskWrites: Writes[UserExtractTask] = (
Expand All @@ -47,9 +47,9 @@ object UserExtractTask {
(JsPath \ "orgKey").write[String] and
(JsPath \ "userId").write[String] and
(JsPath \ "email").write[String] and
(JsPath \ "startedAt").write[DateTime] and
(JsPath \ "uploadStartedAt").writeNullable[DateTime] and
(JsPath \ "endedAt").writeNullable[DateTime]
(JsPath \ "startedAt").write[LocalDateTime] and
(JsPath \ "uploadStartedAt").writeNullable[LocalDateTime] and
(JsPath \ "endedAt").writeNullable[LocalDateTime]
)(unlift(UserExtractTask.unapply))

implicit val userExtractTaskOWrites: OWrites[UserExtractTask] = (
Expand All @@ -58,9 +58,9 @@ object UserExtractTask {
(JsPath \ "orgKey").write[String] and
(JsPath \ "userId").write[String] and
(JsPath \ "email").write[String] and
(JsPath \ "startedAt").write[DateTime] and
(JsPath \ "uploadStartedAt").writeNullable[DateTime] and
(JsPath \ "endedAt").writeNullable[DateTime]
(JsPath \ "startedAt").write[LocalDateTime] and
(JsPath \ "uploadStartedAt").writeNullable[LocalDateTime] and
(JsPath \ "endedAt").writeNullable[LocalDateTime]
)(unlift(UserExtractTask.unapply))

implicit val format: Format[UserExtractTask] =
Expand Down
12 changes: 6 additions & 6 deletions nio-provider/app/models/events.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package models

import org.joda.time.{DateTime, DateTimeZone}
import java.time.{LocalDateTime, Clock}
import utils.NioLogger
import play.api.libs.json._
import utils.DateUtils
Expand Down Expand Up @@ -47,7 +47,7 @@ object NioEvent {

trait NioEvent {
val id: Long
val date: DateTime
val date: LocalDateTime
val tenant: String

def tYpe: EventType.Value
Expand All @@ -70,7 +70,7 @@ case class UserExtractTaskAsked(
author: String,
metadata: Option[Seq[(String, String)]] = None,
id: Long,
date: DateTime = DateTime.now(DateTimeZone.UTC),
date: LocalDateTime = LocalDateTime.now(Clock.systemUTC),
payload: UserExtractTask
) extends NioEvent {
override def tYpe: EventType.Value = EventType.UserExtractTaskAsked
Expand All @@ -82,7 +82,7 @@ case class UserExtractTaskAsked(
"tenant" -> tenant,
"author" -> author,
"metadata" -> buildMetadata(metadata),
"date" -> date.toString(DateUtils.utcDateFormatter),
"date" -> date.format(DateUtils.utcDateFormatter),
"id" -> id,
"payload" -> payload.asJson()
)
Expand All @@ -94,7 +94,7 @@ case class UserExtractTaskCompleted(
author: String,
metadata: Option[Seq[(String, String)]] = None,
id: Long,
date: DateTime = DateTime.now(DateTimeZone.UTC),
date: LocalDateTime = LocalDateTime.now(Clock.systemUTC),
payload: UserExtractTask
) extends NioEvent {
override def tYpe: EventType.Value = EventType.UserExtractTaskCompleted
Expand All @@ -106,7 +106,7 @@ case class UserExtractTaskCompleted(
"tenant" -> tenant,
"author" -> author,
"metadata" -> buildMetadata(metadata),
"date" -> date.toString(DateUtils.utcDateFormatter),
"date" -> date.format(DateUtils.utcDateFormatter),
"id" -> id,
"payload" -> payload.asJson()
)
Expand Down
6 changes: 3 additions & 3 deletions nio-provider/app/service/NioService.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package service

import akka.http.scaladsl.model.HttpMethods
import akka.stream.scaladsl.Source
import akka.util.ByteString
import org.apache.pekko.http.scaladsl.model.HttpMethods
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import configuration.{Env, NioConfig}
import play.api.libs.json.JsValue
import play.api.libs.ws.WSClient
Expand Down
18 changes: 9 additions & 9 deletions nio-provider/app/utils/DateUtils.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package utils

import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import java.time.LocalDateTime
import play.api.libs.json._

import java.time.format.DateTimeFormatter
import scala.util.{Failure, Success, Try}

object DateUtils {
val utcDateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")
val utcDateFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME //forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'")

val utcDateTimeReads = new Reads[DateTime] {
val utcDateTimeReads = new Reads[LocalDateTime] {
def reads(json: JsValue) = json match {
case JsString(s) =>
Try(DateTime.parse(s, utcDateFormatter)) match {
Try(LocalDateTime.parse(s, utcDateFormatter)) match {
case Success(d) => JsSuccess(d)
case Failure(f) => JsSuccess(null)
}
case _ => JsError("error.expected.date")
}
}

val utcDateTimeWrites = new Writes[DateTime] {
override def writes(o: DateTime): JsValue =
JsString(o.toString(DateUtils.utcDateFormatter))
val utcDateTimeWrites = new Writes[LocalDateTime] {
override def writes(o: LocalDateTime): JsValue =
JsString(o.format(DateUtils.utcDateFormatter))
}

val utcDateTimeFormats: Format[DateTime] =
val utcDateTimeFormats: Format[LocalDateTime] =
Format(utcDateTimeReads, utcDateTimeWrites)
}
9 changes: 6 additions & 3 deletions nio-provider/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ lazy val `nio-provider` = (project in file("."))
.enablePlugins(NoPublish)
.disablePlugins(BintrayPlugin)

scalaVersion := "2.13.10"
scalaVersion := "2.13.14"

resolvers ++= Seq(
Resolver.jcenterRepo,
"Maven central" at "https://repo1.maven.org/maven2/"
)

dependencyOverrides ++= Seq(
"com.github.luben" % "zstd-jni" % "1.5.6-4"
)

libraryDependencies ++= Seq(
ws,
"com.typesafe.play" %% "play-json-joda" % playJsonJodaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % akkaStreamKafka,
"org.apache.pekko" %% "pekko-connectors-kafka" % pekkoKafka,
"de.svenkubiak" % "jBCrypt" % "0.4.1", // ISC/BSD
"com.auth0" % "java-jwt" % javaJwt, // MIT license
"com.github.pureconfig" %% "pureconfig" % pureConfig, // Apache 2.0
Expand Down
Loading

0 comments on commit 2234afa

Please sign in to comment.