Skip to content

Commit

Permalink
Batch import (#190)
Browse files Browse the repository at this point in the history
Batch import
  • Loading branch information
larousso authored May 15, 2023
1 parent 056e8d2 commit a1e5cb3
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 4 deletions.
167 changes: 163 additions & 4 deletions nio-server/app/controllers/ConsentController.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package controllers

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.stream.{FlowShape, Materializer}
import akka.stream.scaladsl.{Flow, Framing, GraphDSL, Merge, Partition, Sink, Source}
import akka.util.ByteString
import auth.SecuredAuthContext
import auth.{AuthAction, SecuredAuthContext}
import controllers.ErrorManager.{AppErrorManagerResult, ErrorManagerResult, ErrorWithStatusManagerResult}
import db.{ConsentFactMongoDataStore, LastConsentFactMongoDataStore, OrganisationMongoDataStore, UserMongoDataStore}
import libs.io.IO
import libs.io._
import libs.xmlorjson.XmlOrJson
import messaging.KafkaMessageBroker
import models.ConsentFactCommand.{PatchConsentFact, UpdateConsentFact}
import models.{ConsentFact, _}
import utils.NioLogger
import play.api.http.HttpEntity
import play.api.libs.json.Json
import play.api.libs.json.{JsError, JsNull, JsValue, Json}
import play.api.libs.streams.Accumulator
import play.api.mvc._
import reactivemongo.api.Cursor
import reactivemongo.api.bson.BSONDocument
Expand All @@ -25,6 +28,7 @@ import utils.Result.{AppErrors, ErrorMessage}

import scala.collection.Seq
import scala.concurrent.{ExecutionContext, Future}
import scala.util.hashing.MurmurHash3

class ConsentController(
val AuthAction: ActionBuilder[SecuredAuthContext, AnyContent],
Expand Down Expand Up @@ -268,6 +272,161 @@ class ConsentController(
} yield result).merge
}

val newLineSplit = Framing.delimiter(ByteString("\n"), 10000, allowTruncation = true)
val toJson = Flow[ByteString] via newLineSplit map (_.utf8String) filterNot (_.isEmpty) map (l => Json.parse(l))
def ndJson(implicit ec: ExecutionContext): BodyParser[Source[JsValue, _]] = BodyParser(_ => Accumulator.source[ByteString].map(s => Right(s.via(toJson)))(ec))

object ImportError {
implicit val format = Json.format[ImportError]
}
case class ImportError(message: String, detailedError: JsValue = JsNull, command: JsValue = JsNull)
object ImportResult {
def error(message: String, command: JsValue = JsNull): ImportResult = {
ImportResult(errorsCount = 1, errors = List(ImportError(message, command = command)))
}

implicit val format = Json.format[ImportResult]
}
case class ImportResult(successCount: Int = 0, errorsCount: Int = 0, errors: List[ImportError] = List.empty) {
def combine (other: ImportResult) : ImportResult =
ImportResult(
successCount = successCount + other.successCount,
errorsCount = errorsCount + other.errorsCount,
errors = errors ++ other.errors
)
}

def sharding[In, Out](parallelism: Int, aFlow: Flow[(String, In), Out, NotUsed]) =
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._

val merge = b.add(Merge[Out](parallelism))
val partition = b.add(Partition[(String, In)](parallelism, {
case (id, _) => Math.abs(MurmurHash3.stringHash(id) % parallelism)
}))

for (i <- 0 until parallelism) {
partition.out(i) ~> aFlow.async ~> merge.in(i)
}

FlowShape(partition.in, merge.out)
}
}


def batchImport(tenant: String, orgKey: String) = AuthAction.async(ndJson) { implicit req =>
val result: Future[JsValue] = req.body
.map(json => ((json \ "userId").validate[String].getOrElse(""), json))
.via(sharding(10, Flow[(String, JsValue)].mapAsync(1) { case (_, json) =>
json.validate[ConsentFactCommand].fold(
{ err => FastFuture.successful(ImportResult(errorsCount = 1, errors = List(ImportError("json parsing error", detailedError = JsError.toJson(err), command = json)))) },
{
case UpdateConsentFact(userId, consentFact) => handleImportUpdate(tenant, orgKey, req, json, userId, consentFact)
case PatchConsentFact(userId, patchCommand) => handleImportPatch(tenant, orgKey, req, json, userId, patchCommand)
}
)
}))
.fold(ImportResult()){ (acc, elt) => acc combine elt }
.map { importResult => Json.toJson(importResult) }
.runWith(Sink.head)

result.map { json => Ok(json) }
}

private def handleImportPatch(tenant: String, orgKey: String, req: SecuredAuthContext[Source[JsValue, _]], json: JsValue, userId: String, patchCommand: PartialConsentFact): Future[ImportResult] = {
(for {
_ <- if (patchCommand.userId.isDefined && !patchCommand.userId.contains(userId)) IO.error(ImportResult.error("error.userId.is.immutable", command = json))
else IO.succeed(patchCommand)
command = patchCommand.copy(orgKey = Some(orgKey))
result <- patchCommand.offers match {
case Some(offers) =>
for {
_ <- IO.fromOption(req.authInfo.offerRestrictionPatterns, {
val errorMessages = offers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json));
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}");
ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List))
})
_ <- IO.succeed[ImportResult](offers.filterNot(o => accessibleOfferService.accessibleOfferKey(o.key, req.authInfo.offerRestrictionPatterns)))
.keep(offer => offer.isEmpty, { unauthorizedOffers =>
val errorMessages = unauthorizedOffers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List))
})
consentFactSaved <- consentManagerService
.partialUpdate(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, command, Json.toJson(patchCommand))
.mapError { error =>
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
}
} yield ImportResult(successCount = 1)
case None =>
consentManagerService
.partialUpdate(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, command, Json.toJson(patchCommand))
.mapError { error =>
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
}
.map { _ => ImportResult(successCount = 1) }
}
} yield result).merge
}

private def handleImportUpdate(tenant: String, orgKey: String, req: SecuredAuthContext[Source[JsValue, _]], json: JsValue, userId: String, consentFact: ConsentFact): Future[ImportResult] = {
if (consentFact.userId != userId) {
NioLogger.error(s"error.userId.is.immutable : userId in path $userId // userId on body ${consentFact.userId}")

FastFuture.successful(ImportResult.error("error.userId.is.immutable", command = json))
} else {
val cf: ConsentFact = ConsentFact.addOrgKey(consentFact, orgKey)

(cf.offers, req.authInfo.offerRestrictionPatterns) match {
// case ask create or update offers but no pattern allowed
case (Some(offers), None) =>
val errorMessages =
offers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
Future.successful(ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List)))

// case create or update consents without offers
case (None, _) =>
consentManagerService
.saveConsents(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, cf, Json.toJson(cf))
.fold(
error => {
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
},
consentFactSaved => ImportResult(successCount = 1)
)
// case create or update offers and some patterns are specified
case (Some(offers), Some(_)) =>
// validate offers key are accessible
offers
.filterNot(o =>
accessibleOfferService.accessibleOfferKey(o.key, req.authInfo.offerRestrictionPatterns)
) match {
// case all offers in consent (body) are accessible
case Nil =>
consentManagerService
.saveConsents(tenant, req.authInfo.sub, req.authInfo.metadatas, orgKey, userId, cf, Json.toJson(cf))
.fold(
error => {
NioLogger.error(s"error during consent fact saving $error")
ImportResult(errorsCount = 1, errors = List(ImportError(message = "Error during update", detailedError = error.appErrors.asJson(), command = json)))
},
_ => ImportResult(successCount = 1)
)

// case one or more offers are not accessible
case unauthorizedOffers =>
val errorMessages = unauthorizedOffers.map(o => ImportError(s"offer.${o.key}.not.authorized", command = json))
NioLogger.error(s"not authorized : ${errorMessages.map(_.message)}")
FastFuture.successful(ImportResult(errorsCount = errorMessages.size, errors = errorMessages.to(List)))
}
}
}
}

lazy val defaultPageSize: Int =
sys.env.get("DEFAULT_PAGE_SIZE").map(_.toInt).getOrElse(200)
lazy val defaultParSize: Int =
Expand Down
34 changes: 34 additions & 0 deletions nio-server/app/models/ConsentFact.scala
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,37 @@ object ConsentFact extends ReadableEntity[ConsentFact] {
def addOrgKey(consentFact: ConsentFact, orgKey: String): ConsentFact =
consentFact.copy(orgKey = Some(orgKey))
}

sealed trait ConsentFactCommand

object ConsentFactCommand {
case class PatchConsentFact(userId: String, command: PartialConsentFact) extends ConsentFactCommand

object PatchConsentFact {
val format = Json.format[PatchConsentFact]
}
case class UpdateConsentFact(userId: String, command: ConsentFact) extends ConsentFactCommand

object UpdateConsentFact {
val format = OFormat[UpdateConsentFact](
((__ \ "userId").read[String] and
(__ \ "command").read[ConsentFact](ConsentFact.consentFactReadsWithoutIdAndLastUpdate))(UpdateConsentFact.apply _),
Json.writes[UpdateConsentFact]
)
}

implicit val format = Format(
Reads[ConsentFactCommand] { js =>
(js \ "type").validate[String].flatMap {
case "Update" => UpdateConsentFact.format.reads(js)
case "Patch" => PatchConsentFact.format.reads(js)
}
},
Writes[ConsentFactCommand] {
case c: UpdateConsentFact => UpdateConsentFact.format.writes(c) ++ Json.obj("type" -> "Update")
case c: PatchConsentFact => PatchConsentFact.format.writes(c) ++ Json.obj("type" -> "Patch")
}
)

}

2 changes: 2 additions & 0 deletions nio-server/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ POST /api/:tenant/organisations/:orgKey/draft/_release

GET /api/:tenant/organisations/:orgKey/users/_template controllers.ConsentController.getTemplate(tenant: String, orgKey: String, userId: Option[String] ?= None, offerKeys: Option[Seq[String]] ?= None)

POST /api/:tenant/organisations/:orgKey/users/_batch controllers.ConsentController.batchImport(tenant: String, orgKey)

PUT /api/:tenant/organisations/:orgKey/users/:userId controllers.ConsentController.createOrReplaceIfExists(tenant: String, orgKey: String, userId: String)

PATCH /api/:tenant/organisations/:orgKey/users/:userId controllers.ConsentController.partialUpdate(tenant: String, orgKey: String, userId: String)
Expand Down
23 changes: 23 additions & 0 deletions nio-server/test/controllers/ConsentControllerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,29 @@ class ConsentControllerSpec extends TestUtils {
.as[Boolean] mustBe user1Modified.groups(1).consents(2).checked
}

"batch import consent" in {

val commands = List(
Json.obj("userId" -> user6Modified.userId, "type" -> "Update", "command" -> user6ModifiedAsJson),
Json.obj("userId" -> user1.userId, "type" -> "Patch", "command" -> Json.obj(
"groups" -> Json.arr(
Json.obj(
"key" -> "maifNotifs",
"consents" -> Json.arr(
Json.obj("key" -> "phone", "checked" -> false)
)
)
)
))
).map(Json.stringify _).mkString("", "\n", "\n")

val response = postText(s"/$tenant/organisations/$organisationKey/users/_batch", commands)
println(response.body)
response.status mustBe OK
(response.json \ "successCount").validate[Int].get mustBe 2

}

"update user with a subset of consents compare to organisation version" in {
val response = putJson(
s"/$tenant/organisations/$organisationKey/users/$userId2",
Expand Down
3 changes: 3 additions & 0 deletions nio-server/test/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ trait TestUtils
def postJson(path: String, body: JsValue, headers: Seq[(String, String)] = jsonHeaders) =
callByType[JsValue](path = path, httpVerb = POST, body = body, headers = headers)

def postText(path: String, body: String, headers: Seq[(String, String)] = jsonHeaders) =
callByType[String](path = path, httpVerb = POST, body = body, headers = headers)

def postBinaryFile(path: String, body: File, api: Boolean = true, headers: Seq[(String, String)] = jsonHeaders) = {
val suffix = if (api) apiPath else serverHost
val futureResponse = ws
Expand Down

0 comments on commit a1e5cb3

Please sign in to comment.