Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Fix accidental Cassandra tombstone creation when recording measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
Michel Zimmer committed Jun 23, 2022
1 parent 5e46b8a commit 9e00282
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 98 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.title="bandwhichd-server"
LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics"
LABEL org.opencontainers.image.version="0.5.0"
LABEL org.opencontainers.image.version="0.5.1"
USER guest
ENTRYPOINT ["/opt/java/openjdk/bin/java"]
CMD ["-jar", "/opt/bandwhichd-server.jar"]
EXPOSE 8080
HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \
CMD wget --spider http://localhost:8080/v1/health || exit 1
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.0.jar /opt/bandwhichd-server.jar
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.1.jar /opt/bandwhichd-server.jar
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ lazy val root = (project in file("."))
.settings(
organization := "de.neuland-bfi",
name := "bandwhichd-server",
version := "0.5.0",
version := "0.5.1",
scalaVersion := "3.1.2",
Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala",
Test / scalaSource := baseDirectory.value / "src" / "test" / "scala",
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ services:
ports:
- 9042:9042
bandwhichd-server:
depends_on:
- cassandra
build:
context: .
command:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,127 +16,125 @@ import java.util.UUID
import scala.util.Try

object MeasurementCassandraCodecs {
given Encoder[Measurement[Timing]] =
(measurement: Measurement[Timing]) =>
measurement match
case nc: Measurement.NetworkConfiguration =>
Encoder[Measurement.NetworkConfiguration].mapJson(
_.mapObject(
_.add(
"measurement_type",
Json.fromString("network_configuration")
)
)
)(nc)
case nu: Measurement.NetworkUtilization =>
Encoder[Measurement.NetworkUtilization].mapJson(
_.mapObject(
_.add(
"measurement_type",
Json.fromString("network_utilization")
)
)
)(nu)

given Decoder[Measurement[Timing]] =
(c: HCursor) => {
val measurementTypeCursor = c.downField("measurement_type")
for {
measurementType <- measurementTypeCursor.as[String]
measurement <- {
measurementType match
case "network_configuration" =>
c.as[Measurement.NetworkConfiguration]
case "network_utilization" =>
c.as[Measurement.NetworkUtilization]
case _ =>
Left(
DecodingFailure(
s"invalid measurement type $measurementType",
measurementTypeCursor.history
)
)
}
} yield measurement
}

given Codec[Measurement.NetworkConfiguration] =
Codec.forProduct6(
given Codec[Measurement[Timing]] =
Codec.forProduct9(
"agent_id",
"timestamp",
"end_timestamp",
"measurement_type",
"network_configuration_machine_id",
"network_configuration_hostname",
"network_configuration_interfaces",
"network_configuration_open_sockets"
"network_configuration_open_sockets",
"network_utilization_connections"
)(
(
agentId: AgentId,
timestamp: Timing.Timestamp,
endTimestamp: Timing.Timestamp,
measurementType: String,
machinedId: MachineId,
hostname: Hostname,
interfaces: Seq[Interface],
openSockets: Seq[OpenSocket]
openSockets: Seq[OpenSocket],
connections: Seq[Connection]
) =>
Measurement.NetworkConfiguration(
agentId = agentId,
timing = timestamp,
machineId = machinedId,
hostname = hostname,
interfaces = interfaces,
openSockets = openSockets
measurementType match
case "network_configuration" =>
Measurement.NetworkConfiguration(
agentId = agentId,
timing = timestamp,
machineId = machinedId,
hostname = hostname,
interfaces = interfaces,
openSockets = openSockets
)
case "network_utilization" =>
Measurement.NetworkUtilization(
agentId = agentId,
timing = Timing.Timeframe(
Interval(
start = timestamp.instant,
stop = endTimestamp.instant
)
),
connections = connections
)
case _ =>
throw DecodingFailure(
s"invalid measurement type $measurementType",
List(CursorOp.DownField("measurement_type"))
)
)(_ match
case Measurement.NetworkConfiguration(
agentId,
timing,
machineId,
hostname,
interfaces,
openSockets
) =>
(
agentId,
timing,
Timing.Timestamp(Instant.EPOCH),
"network_configuration",
machineId,
hostname,
interfaces,
openSockets,
Seq.empty[Connection]
)
case Measurement.NetworkUtilization(
agentId,
timing,
connections
) =>
(
agentId,
Timing.Timestamp(timing.value.normalizedStart),
Timing.Timestamp(timing.value.normalizedStop),
"network_utilization",
MachineId(new UUID(0, 0)),
Hostname.fromString("a").get,
Seq.empty[Interface],
Seq.empty[OpenSocket],
connections
)
)(nc =>
(
nc.agentId,
nc.timing,
nc.machineId,
nc.hostname,
nc.interfaces,
nc.openSockets
)
)

given Codec[Interface] =
Codec.forProduct3("name", "is_up", "networks")(Interface.apply)(interface =>
Codec.forProduct3(
"name",
"is_up",
"networks"
)(Interface.apply)(interface =>
(interface.name, interface.isUp, interface.networks)
)

given Codec[OpenSocket] =
Codec.forProduct3("socket", "protocol", "maybe_process_name")(
OpenSocket.apply
)(openSocket =>
(openSocket.socket, openSocket.protocol, openSocket.maybeProcessName)
)

given Codec[Measurement.NetworkUtilization] =
Codec.forProduct4(
"agent_id",
"timestamp",
"end_timestamp",
"network_utilization_connections"
Codec.forProduct3(
"socket",
"protocol",
"maybe_process_name"
)(
(
agentId: AgentId,
timestamp: Timing.Timestamp,
endTimestamp: Timing.Timestamp,
connections: Seq[Connection]
socket: SocketAddress[Host],
protocol: Protocol,
processNameValue: String
) =>
Measurement.NetworkUtilization(
agentId = agentId,
timing = Timing.Timeframe(
Interval(
start = timestamp.instant,
stop = endTimestamp.instant
)
),
connections = connections
OpenSocket(
socket = socket,
protocol = protocol,
maybeProcessName =
if (processNameValue.nonEmpty) Option(ProcessName(processNameValue))
else None
)
)(nu =>
)(openSocket =>
(
nu.agentId,
Timing.Timestamp(nu.timing.value.normalizedStart),
Timing.Timestamp(nu.timing.value.normalizedStop),
nu.connections
openSocket.socket,
openSocket.protocol,
openSocket.maybeProcessName.fold("")(_.value)
)
)

Expand Down

0 comments on commit 9e00282

Please sign in to comment.