Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#368] AxonSerializer documentation and enforce ReplayToken.context to String #370

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2024. Axon Framework
* Copyright (c) 2010-2025. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,8 +49,34 @@ import org.axonframework.messaging.responsetypes.OptionalResponseType
import org.axonframework.messaging.responsetypes.ResponseType
import kotlin.reflect.KClass

private val trackingTokenSerializer = PolymorphicSerializer(TrackingToken::class).nullable
/**
* Serializer for Axon's [TrackingToken] class.
* Provides serialization and deserialization support for nullable instances of TrackingToken.
* This serializer uses [replayTokenContextSerializer] to serialize the context field and now only [String] type or null value is supported!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think line 55 belongs to this description, right?

*
* @see TrackingToken
*/
val trackingTokenSerializer = PolymorphicSerializer(TrackingToken::class).nullable

/**
* Serializer for the [ReplayToken.context], represented as a nullable String.
* This context is typically used to provide additional information during token replay operations.
*
* This serializer is used by [trackingTokenSerializer] to serialize the context field and now only [String] type or null value is supported!
* Sadly enough, there's no straightforward solution to support [Any]; not without adjusting the context field of the ReplayToken in Axon Framework itself.
* That is, however, a breaking change, and as such, cannot be done till version 5.0.0 of the Axon Framework.
* This also allow more complex objects as the context, although it requires the user to do the de-/serialization to/from String, instead of the Axon Framework itself.
* Look at AxonSerializersTest, case `replay token with complex object as String context` for an example how to handle that using Kotlin Serialization.
*
* @see ReplayToken.context
*/
val replayTokenContextSerializer = String.serializer().nullable

/**
* Module defining serializers for Axon Framework's core event handling and messaging components.
* This module includes serializers for TrackingTokens, ScheduleTokens, and ResponseTypes, enabling
* seamless integration with Axon-based applications.
*/
val AxonSerializersModule = SerializersModule {
contextual(ConfigToken::class) { ConfigTokenSerializer }
contextual(GapAwareTrackingToken::class) { GapAwareTrackingTokenSerializer }
Expand Down Expand Up @@ -86,6 +112,11 @@ val AxonSerializersModule = SerializersModule {
}
}

/**
* Serializer for [ConfigToken].
*
* @see ConfigToken
*/
object ConfigTokenSerializer : KSerializer<ConfigToken> {

private val mapSerializer = MapSerializer(String.serializer(), String.serializer())
Expand All @@ -112,6 +143,11 @@ object ConfigTokenSerializer : KSerializer<ConfigToken> {
}
}

/**
* Serializer for [GapAwareTrackingToken].
*
* @see GapAwareTrackingToken
*/
object GapAwareTrackingTokenSerializer : KSerializer<GapAwareTrackingToken> {

private val setSerializer = SetSerializer(Long.serializer())
Expand Down Expand Up @@ -143,6 +179,11 @@ object GapAwareTrackingTokenSerializer : KSerializer<GapAwareTrackingToken> {
}
}

/**
* Serializer for [MultiSourceTrackingToken].
*
* @see MultiSourceTrackingToken
*/
object MultiSourceTrackingTokenSerializer : KSerializer<MultiSourceTrackingToken> {

private val mapSerializer = MapSerializer(String.serializer(), trackingTokenSerializer)
Expand All @@ -169,6 +210,11 @@ object MultiSourceTrackingTokenSerializer : KSerializer<MultiSourceTrackingToken
}
}

/**
* Serializer for [MergedTrackingToken].
*
* @see MergedTrackingToken
*/
object MergedTrackingTokenSerializer : KSerializer<MergedTrackingToken> {

override val descriptor = buildClassSerialDescriptor(MergedTrackingToken::class.java.name) {
Expand Down Expand Up @@ -199,36 +245,61 @@ object MergedTrackingTokenSerializer : KSerializer<MergedTrackingToken> {
}
}

/**
* Serializer for [ReplayToken].
* The [ReplayToken.context] value can be only a String or null.
* See [replayTokenContextSerializer] for more information how to handle the context field.
*
* @see ReplayToken
*/
object ReplayTokenSerializer : KSerializer<ReplayToken> {

override val descriptor = buildClassSerialDescriptor(ReplayToken::class.java.name) {
element<TrackingToken>("tokenAtReset")
element<TrackingToken>("currentToken")
element<String>("context")
}

override fun deserialize(decoder: Decoder) = decoder.decodeStructure(descriptor) {
var tokenAtReset: TrackingToken? = null
var currentToken: TrackingToken? = null
var context: String? = null
while (true) {
val index = decodeElementIndex(descriptor)
if (index == CompositeDecoder.DECODE_DONE) break
when (index) {
0 -> tokenAtReset = decodeSerializableElement(descriptor, index, trackingTokenSerializer)
1 -> currentToken = decodeSerializableElement(descriptor, index, trackingTokenSerializer)
2 -> context = decodeSerializableElement(descriptor, index, replayTokenContextSerializer)
}
}
ReplayToken(
ReplayToken.createReplayToken(
tokenAtReset ?: throw SerializationException("Element 'tokenAtReset' is missing"),
currentToken,
)
context
) as ReplayToken
}

override fun serialize(encoder: Encoder, value: ReplayToken) = encoder.encodeStructure(descriptor) {
encodeSerializableElement(descriptor, 0, trackingTokenSerializer, value.tokenAtReset)
encodeSerializableElement(descriptor, 1, trackingTokenSerializer, value.currentToken)
encodeSerializableElement(
descriptor,
2,
replayTokenContextSerializer,
stringOrNullFrom(value.context())
)
}

private fun stringOrNullFrom(obj: Any?): String? =
obj?.takeIf { it is String }?.let { it as String }
}

/**
* Serializer for [GlobalSequenceTrackingToken].
*
* @see GlobalSequenceTrackingToken
*/
object GlobalSequenceTrackingTokenSerializer : KSerializer<GlobalSequenceTrackingToken> {

override val descriptor = buildClassSerialDescriptor(GlobalSequenceTrackingToken::class.java.name) {
Expand All @@ -254,6 +325,11 @@ object GlobalSequenceTrackingTokenSerializer : KSerializer<GlobalSequenceTrackin
}
}

/**
* Serializer for [SimpleScheduleToken].
*
* @see SimpleScheduleToken
*/
object SimpleScheduleTokenSerializer : KSerializer<SimpleScheduleToken> {

override val descriptor = buildClassSerialDescriptor(SimpleScheduleToken::class.java.name) {
Expand All @@ -279,6 +355,11 @@ object SimpleScheduleTokenSerializer : KSerializer<SimpleScheduleToken> {
}
}

/**
* Serializer for [QuartzScheduleToken].
*
* @see QuartzScheduleToken
*/
object QuartzScheduleTokenSerializer : KSerializer<QuartzScheduleToken> {

override val descriptor = buildClassSerialDescriptor(QuartzScheduleToken::class.java.name) {
Expand Down Expand Up @@ -334,14 +415,34 @@ abstract class ResponseTypeSerializer<R : ResponseType<*>>(kClass: KClass<R>, pr
}
}

/**
* Serializer for [InstanceResponseType].
*
* @see InstanceResponseType
*/
object InstanceResponseTypeSerializer : KSerializer<InstanceResponseType<*>>,
ResponseTypeSerializer<InstanceResponseType<*>>(InstanceResponseType::class, { InstanceResponseType(it) })

/**
* Serializer for [OptionalResponseType].
*
* @see OptionalResponseType
*/
object OptionalResponseTypeSerializer : KSerializer<OptionalResponseType<*>>,
ResponseTypeSerializer<OptionalResponseType<*>>(OptionalResponseType::class, { OptionalResponseType(it) })

/**
* Serializer for [MultipleInstancesResponseType].
*
* @see MultipleInstancesResponseType
*/
object MultipleInstancesResponseTypeSerializer : KSerializer<MultipleInstancesResponseType<*>>,
ResponseTypeSerializer<MultipleInstancesResponseType<*>>(MultipleInstancesResponseType::class, { MultipleInstancesResponseType(it) })

/**
* Serializer for [ArrayResponseType].
*
* @see ArrayResponseType
*/
object ArrayResponseTypeSerializer : KSerializer<ArrayResponseType<*>>,
ResponseTypeSerializer<ArrayResponseType<*>>(ArrayResponseType::class, { ArrayResponseType(it) })
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package org.axonframework.extensions.kotlin.serializer

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import kotlinx.serialization.Serializable
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.axonframework.eventhandling.GapAwareTrackingToken
import org.axonframework.eventhandling.GlobalSequenceTrackingToken
Expand All @@ -36,7 +41,9 @@ import org.axonframework.messaging.responsetypes.ResponseType
import org.axonframework.serialization.Serializer
import org.axonframework.serialization.SimpleSerializedObject
import org.axonframework.serialization.SimpleSerializedType
import org.axonframework.serialization.json.JacksonSerializer
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertInstanceOf
import org.junit.jupiter.api.Test

internal class AxonSerializersTest {
Expand Down Expand Up @@ -76,21 +83,49 @@ internal class AxonSerializersTest {
}

@Test
fun replayToken() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(15), GlobalSequenceTrackingToken(10))
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10}}"""
fun `replay token with String context`() {
val token = ReplayToken.createReplayToken(
GlobalSequenceTrackingToken(15), GlobalSequenceTrackingToken(10), "someContext"
)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10},"context":"someContext"}""".trimIndent()
assertEquals(json, serializer.serialize(token, String::class.java).data)
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token with currentToken with null value`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null}"""
fun `replay token with currentToken with null value and null context`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null, null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null,"context":null}"""
assertEquals(json, serializer.serialize(token, String::class.java).data)
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token deserialize without context field`() {
val token = ReplayToken.createReplayToken(GlobalSequenceTrackingToken(5), null, null)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":5},"currentToken":null}"""
assertEquals(token, serializer.deserializeTrackingToken(token.javaClass.name, json))
}

@Test
fun `replay token with complex object as String context`() {
@Serializable
data class ComplexContext(val value1: String, val value2: Int, val value3: Boolean)
val complexContext = ComplexContext("value1", 2, false)

val token = ReplayToken.createReplayToken(
GlobalSequenceTrackingToken(15),
GlobalSequenceTrackingToken(10),
Json.encodeToString(complexContext)
)
val json = """{"tokenAtReset":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":15},"currentToken":{"type":"org.axonframework.eventhandling.GlobalSequenceTrackingToken","globalIndex":10},"context":"{\"value1\":\"value1\",\"value2\":2,\"value3\":false}"}""".trimIndent()
assertEquals(json, serializer.serialize(token, String::class.java).data)
val deserializedToken = serializer.deserializeTrackingToken(token.javaClass.name, json) as ReplayToken
assertEquals(token, deserializedToken)
assertInstanceOf(String::class.java, deserializedToken.context())
assertEquals(complexContext, Json.decodeFromString<ComplexContext>(deserializedToken.context() as String))
}

@Test
fun globalSequenceTrackingToken() {
val token = GlobalSequenceTrackingToken(5)
Expand Down