Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Add capping of in-memory storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Zimmer committed Jun 15, 2022
1 parent 89d52f5 commit 710750c
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 16 deletions.
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.title="bandwhichd-server"
LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics"
LABEL org.opencontainers.image.version="0.3.0"
LABEL org.opencontainers.image.version="0.4.0"
USER guest
ENTRYPOINT ["/opt/java/openjdk/bin/java", "-jar", "/opt/bandwhichd-server.jar"]
ENTRYPOINT ["/opt/java/openjdk/bin/java"]
CMD ["-jar", "/opt/bandwhichd-server.jar"]
EXPOSE 8080
HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \
CMD wget --spider http://localhost:8080/v1/health || exit 1
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.3.0.jar /opt/bandwhichd-server.jar
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.4.0.jar /opt/bandwhichd-server.jar
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ lazy val root = (project in file("."))
.settings(
organization := "de.neuland-bfi",
name := "bandwhichd-server",
version := "0.3.0",
version := "0.4.0",
scalaVersion := "3.1.2",
Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala",
Test / scalaSource := baseDirectory.value / "src" / "test" / "scala",
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@ services:
main:
build:
context: .
command:
- -Xmx1g
- -jar
- /opt/bandwhichd-server.jar
ports:
- 8080:8080
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package de.neuland.bandwhichd.server.adapter.in.scheduler

import cats.Monad
import de.neuland.bandwhichd.server.adapter.out.measurement.MeasurementInMemoryRepository
import de.neuland.bandwhichd.server.lib.health.jvm.JvmMemoryUtilization
import de.neuland.bandwhichd.server.lib.scheduling.{Schedule, Scheduler, Work}

import scala.concurrent.duration.{FiniteDuration, SECONDS}

class MemoryScheduler[F[_]: Monad](
private val measurementInMemoryRepository: MeasurementInMemoryRepository[F]
) extends Scheduler[F] {

private val inMemoryStorageCapThreshold: Long =
scala.util.Properties
.envOrElse("IN_MEMORY_STORAGE_CAP_THRESHOLD", "")
.toLongOption
.getOrElse(80)

override def schedule: F[Schedule[F]] =
Monad[F].pure(
Schedule.Pausing(
FiniteDuration(10, SECONDS),
Work({
if (
JvmMemoryUtilization.current.usedMemoryPercentage > inMemoryStorageCapThreshold
) {
measurementInMemoryRepository.cap
} else {
Monad[F].pure(())
}
})
)
)
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
package de.neuland.bandwhichd.server.adapter.in.v1.health

import cats.effect.Async
import de.neuland.bandwhichd.server.adapter.out.measurement.MeasurementInMemoryRepository
import de.neuland.bandwhichd.server.lib.health.jvm.JvmMemoryUtilization
import de.neuland.bandwhichd.server.lib.health.Health
import de.neuland.bandwhichd.server.lib.health.{Check, Health}
import fs2.Pure
import io.circe.Json
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits.*
import org.http4s.{Entity, HttpRoutes, Response, Status}

class HealthController[F[_]: Async] extends Http4sDsl[F] {
class HealthController[F[_]: Async](
private val measurementInMemoryRepository: MeasurementInMemoryRepository[F]
) extends Http4sDsl[F] {

val routes: HttpRoutes[F] =
HttpRoutes.of[F] { case GET -> Root / "v1" / "health" =>
health
}

private val currentHealth = Health(
Seq[Check](
JvmMemoryUtilization.current,
measurementInMemoryRepository
)
)

private def health: F[Response[F]] = {
val encoder = org.http4s.circe.jsonEncoder

val currentHealth = Health(Seq(JvmMemoryUtilization.current))
val status: Status =
Status
.fromInt(currentHealth.httpResponseStatusCode)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package de.neuland.bandwhichd.server.adapter.out

case class CappedStorage[A] private (
maybeCap: Option[Int],
storage: Vector[A]
) {
def cap: CappedStorage[A] =
capAt(storage.length)

private def capAt(cap: Int): CappedStorage[A] =
capAt(Some(cap))

private def capAt(maybeCap: Option[Int]): CappedStorage[A] =
maybeCap.fold(CappedStorage(maybeCap, storage))(cap =>
CappedStorage(maybeCap, storage.drop(storage.length - cap))
)

def store(value: A): CappedStorage[A] =
CappedStorage(
maybeCap,
maybeCap
.fold(storage)(cap => storage.drop(storage.length + 1 - cap))
.appended(value)
)
}

object CappedStorage {
def empty[A]: CappedStorage[A] = CappedStorage(None, Vector.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,48 @@ package de.neuland.bandwhichd.server.adapter.out.measurement
import cats.Monad
import cats.effect.Sync
import cats.effect.kernel.Concurrent
import de.neuland.bandwhichd.server.adapter.out.CappedStorage
import de.neuland.bandwhichd.server.domain.measurement.{
Measurement,
MeasurementRepository,
Timing
}
import de.neuland.bandwhichd.server.lib.health.jvm.JvmMemoryUtilization
import de.neuland.bandwhichd.server.lib.health.{Check, CheckKey}
import io.circe.{Json, JsonObject}

import java.util.concurrent.atomic.AtomicReference

class MeasurementInMemoryRepository[F[_]: Sync]
extends MeasurementRepository[F] {
private val storage: AtomicReference[Seq[Measurement[Timing]]] =
new AtomicReference(Seq.empty)
extends MeasurementRepository[F]
with Check {
private val storage: AtomicReference[CappedStorage[Measurement[Timing]]] =
new AtomicReference(CappedStorage.empty)

override def record(measurement: Measurement[Timing]): F[Unit] =
Sync[F].blocking {
val _ =
storage.updateAndGet(measurements => measurements.appended(measurement))
val _ = storage.updateAndGet(_.store(measurement))
()
}

override def getAll: F[Seq[Measurement[Timing]]] =
Sync[F].blocking {
storage.get()
storage.get().storage.toSeq
}

override def key: CheckKey =
CheckKey("in-memory-storage:utilization")

override def value: JsonObject =
JsonObject(
"componentType" -> Json.fromString("datastore"),
"observedValue" -> Json.fromInt(storage.get().storage.length),
"observedUnit" -> Json.fromString("objects")
)

def cap: F[Unit] =
Sync[F].blocking {
val _ = storage.updateAndGet(_.cap)
()
}
}
16 changes: 13 additions & 3 deletions src/main/scala/de/neuland/bandwhichd/server/boot/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.*
import cats.effect.kernel.Outcome
import cats.implicits.*
import de.neuland.bandwhichd.server.adapter.in.scheduler.AggregationScheduler
import de.neuland.bandwhichd.server.adapter.in.scheduler.MemoryScheduler
import de.neuland.bandwhichd.server.adapter.in.v1.health.HealthController
import de.neuland.bandwhichd.server.adapter.in.v1.message.MessageController
import de.neuland.bandwhichd.server.adapter.in.v1.stats.StatsController
Expand All @@ -29,8 +30,10 @@ import scala.io.StdIn

class App[F[_]: Async] {
// out
val measurementRepository: MeasurementRepository[F] =
val measurementInMemoryRepository: MeasurementInMemoryRepository[F] =
MeasurementInMemoryRepository[F]()
val measurementRepository: MeasurementRepository[F] =
measurementInMemoryRepository
val statsRepository: StatsRepository[F] =
StatsInMemoryRepository[F]()

Expand All @@ -48,7 +51,9 @@ class App[F[_]: Async] {

// in http
val healthController: HealthController[F] =
HealthController[F]
HealthController[F](
measurementInMemoryRepository = measurementInMemoryRepository
)
val messageController: MessageController[F] =
MessageController[F](
measurementApplicationService = measurementApplicationService
Expand All @@ -63,6 +68,10 @@ class App[F[_]: Async] {
AggregationScheduler[F](
statsApplicationService = statsApplicationService
)
val memoryScheduler: Scheduler[F] =
MemoryScheduler[F](
measurementInMemoryRepository = measurementInMemoryRepository
)

// http
val routes: Routes[F] =
Expand All @@ -78,7 +87,8 @@ class App[F[_]: Async] {
// scheduling
val schedulersOperator: Operator[F] =
SchedulersOperator[F](
aggregationScheduler
aggregationScheduler,
memoryScheduler
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package de.neuland.bandwhichd.server.adapter.out

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class CappedStorageSpec extends AnyWordSpec with Matchers {
"CappedStorage" should {
"store and cap" in {
val cappedStorage0 = CappedStorage.empty[Int]
cappedStorage0.maybeCap shouldBe None
cappedStorage0.storage shouldBe empty
val cappedStorage1 = cappedStorage0.store(3)
cappedStorage1.maybeCap shouldBe None
cappedStorage1.storage shouldBe Vector(3)
val cappedStorage2 = cappedStorage1.store(2)
cappedStorage2.maybeCap shouldBe None
cappedStorage2.storage shouldBe Vector(3, 2)
val cappedStorage3 = cappedStorage2.store(-5)
cappedStorage3.maybeCap shouldBe None
cappedStorage3.storage shouldBe Vector(3, 2, -5)
val cappedStorage4 = cappedStorage3.cap
cappedStorage4.maybeCap shouldBe Some(3)
cappedStorage4.storage shouldBe Vector(3, 2, -5)
val cappedStorage5 = cappedStorage4.store(9)
cappedStorage5.maybeCap shouldBe Some(3)
cappedStorage5.storage shouldBe Vector(2, -5, 9)
val cappedStorage6 = cappedStorage5.store(0)
cappedStorage6.maybeCap shouldBe Some(3)
cappedStorage6.storage shouldBe Vector(-5, 9, 0)
}
}
}

0 comments on commit 710750c

Please sign in to comment.