Skip to content

Commit 68b0440

Browse files
committed
Allow other serialization libs (to support jackson 3 in future)
1 parent 7253786 commit 68b0440

22 files changed

+57
-42
lines changed

util/api/datasourcex-util.api

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ public final class com/caplin/integration/datasourcex/util/SimpleDataSourceConfi
127127

128128
public final class com/caplin/integration/datasourcex/util/SimpleDataSourceFactory {
129129
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/SimpleDataSourceFactory;
130-
public static final fun createDataSource (Lcom/caplin/integration/datasourcex/util/SimpleDataSourceConfig;Lcom/fasterxml/jackson/databind/ObjectMapper;)Lcom/caplin/datasource/DataSource;
131-
public static synthetic fun createDataSource$default (Lcom/caplin/integration/datasourcex/util/SimpleDataSourceConfig;Lcom/fasterxml/jackson/databind/ObjectMapper;ILjava/lang/Object;)Lcom/caplin/datasource/DataSource;
132-
public final fun getDefaultObjectMapper ()Lcom/fasterxml/jackson/databind/ObjectMapper;
130+
public static final fun createDataSource (Lcom/caplin/integration/datasourcex/util/SimpleDataSourceConfig;Lcom/caplin/datasource/messaging/json/JsonHandler;)Lcom/caplin/datasource/DataSource;
131+
public static synthetic fun createDataSource$default (Lcom/caplin/integration/datasourcex/util/SimpleDataSourceConfig;Lcom/caplin/datasource/messaging/json/JsonHandler;ILjava/lang/Object;)Lcom/caplin/datasource/DataSource;
132+
public static final fun createJackson2JsonHandler (Lcom/fasterxml/jackson/databind/ObjectMapper;)Lcom/caplin/datasource/messaging/json/JacksonJsonHandler;
133+
public final fun getDefaultJackson2JsonHandler ()Lcom/caplin/datasource/messaging/json/JacksonJsonHandler;
134+
public final fun getDefaultJackson2ObjectMapper ()Lcom/fasterxml/jackson/databind/ObjectMapper;
133135
}
134136

135137
public final class com/caplin/integration/datasourcex/util/TimeoutKt {
@@ -462,13 +464,14 @@ public final class com/caplin/integration/datasourcex/util/flow/ValueOrCompletio
462464

463465
public final class com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModuleKt {
464466
public static final fun registerDataSourceSerializers (Lorg/apache/fory/Fory;)Lorg/apache/fory/Fory;
467+
public static final fun registerPersistentCollectionSerializers (Lorg/apache/fory/Fory;)Lorg/apache/fory/Fory;
465468
}
466469

467-
public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule : com/fasterxml/jackson/databind/module/SimpleModule {
468-
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule;
470+
public final class com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule : com/fasterxml/jackson/databind/module/SimpleModule {
471+
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule;
469472
}
470473

471-
public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModuleKt {
474+
public final class com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModuleKt {
472475
public static final fun registerDataSourceModule (Lcom/fasterxml/jackson/databind/ObjectMapper;)Lcom/fasterxml/jackson/databind/ObjectMapper;
473476
}
474477

util/src/main/kotlin/com/caplin/integration/datasourcex/util/SimpleDataSourceFactory.kt

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package com.caplin.integration.datasourcex.util
22

33
import com.caplin.datasource.DataSource
44
import com.caplin.datasource.messaging.json.JacksonJsonHandler
5-
import com.caplin.integration.datasourcex.util.serialization.jackson.registerDataSourceModule
5+
import com.caplin.datasource.messaging.json.JsonHandler
6+
import com.caplin.integration.datasourcex.util.SimpleDataSourceFactory.defaultJackson2JsonHandler
7+
import com.caplin.integration.datasourcex.util.SimpleDataSourceFactory.defaultJackson2ObjectMapper
8+
import com.caplin.integration.datasourcex.util.serialization.jackson2.registerDataSourceModule
69
import com.fasterxml.jackson.databind.ObjectMapper
710
import com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
811
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
@@ -24,22 +27,35 @@ object SimpleDataSourceFactory {
2427
* The default [ObjectMapper] used for serializing and deserializing JSON payloads. It is
2528
* pre-configured with the JavaTime module and DataSource serialization extensions.
2629
*/
27-
val defaultObjectMapper: ObjectMapper =
30+
val defaultJackson2ObjectMapper: ObjectMapper =
2831
jacksonObjectMapper()
2932
.configure(WRITE_DATES_AS_TIMESTAMPS, false)
3033
.registerModule(JavaTimeModule())
3134
.registerDataSourceModule()
3235

36+
@JvmStatic
37+
fun createJackson2JsonHandler(objectMapper: ObjectMapper): JacksonJsonHandler =
38+
JacksonJsonHandler(
39+
Logger.getLogger(JacksonJsonHandler::class.qualifiedName),
40+
objectMapper,
41+
)
42+
43+
val defaultJackson2JsonHandler: JacksonJsonHandler =
44+
createJackson2JsonHandler(defaultJackson2ObjectMapper)
45+
3346
/**
3447
* Creates a data source based on the given simple configuration.
3548
*
3649
* @param simpleConfig The simple configuration for the data source.
50+
* @param jsonHandler The [JsonHandler] to use for serializing and deserializing JSON payloads.
51+
* This defaults to the Jackson 2 [defaultJackson2JsonHandler] backed by
52+
* [defaultJackson2ObjectMapper].
3753
* @return The created data source.
3854
*/
3955
@JvmStatic
4056
fun createDataSource(
4157
simpleConfig: SimpleDataSourceConfig,
42-
objectMapper: ObjectMapper = defaultObjectMapper,
58+
jsonHandler: JsonHandler<*> = defaultJackson2JsonHandler,
4359
): DataSource {
4460
val logPath =
4561
simpleConfig.logDirectory
@@ -123,12 +139,6 @@ object SimpleDataSourceFactory {
123139
.trimMargin()
124140

125141
return DataSource.fromConfigString(config, Logger.getLogger(DataSource::class.qualifiedName))
126-
.apply {
127-
extraConfiguration.jsonHandler =
128-
JacksonJsonHandler(
129-
Logger.getLogger(JacksonJsonHandler::class.qualifiedName),
130-
objectMapper,
131-
)
132-
}
142+
.apply { extraConfiguration.jsonHandler = jsonHandler }
133143
}
134144
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Cast.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ import kotlinx.coroutines.flow.Flow
88
*
99
* @return A new [Flow] containing the cast elements of type [R].
1010
*/
11-
@Suppress("UNCHECKED_CAST") fun <R : Any?> Flow<*>.cast(): Flow<R> = this as Flow<R>
11+
@Suppress("UNCHECKED_CAST") fun <R> Flow<*>.cast(): Flow<R> = this as Flow<R>

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/CompletingSharedFlow.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import kotlinx.coroutines.flow.transformWhile
4040
* @see loading
4141
* @see CompletingSharedFlowCache
4242
*/
43-
fun interface LoadingCompletingSharedFlowCache<K : Any, T : Any?> {
43+
fun interface LoadingCompletingSharedFlowCache<K : Any, T> {
4444

4545
/**
4646
* If the requested key [K] is not found in the cache when the [CompletingSharedFlow] returned by
@@ -52,7 +52,7 @@ fun interface LoadingCompletingSharedFlowCache<K : Any, T : Any?> {
5252
}
5353

5454
/** Converts this [CompletingSharedFlowCache] to a [LoadingCompletingSharedFlowCache]. */
55-
fun <K : Any, T : Any?> CompletingSharedFlowCache<K, T>.loading(
55+
fun <K : Any, T> CompletingSharedFlowCache<K, T>.loading(
5656
supplier: (K) -> Flow<T>
5757
): LoadingCompletingSharedFlowCache<K, T> = LoadingCompletingSharedFlowCache { key ->
5858
get(key, supplier)
@@ -62,7 +62,7 @@ fun <K : Any, T : Any?> CompletingSharedFlowCache<K, T>.loading(
6262
* A cache of flows which each follow the behaviour of [shareInCompleting] - i.e. they share a
6363
* single upstream and pass completion and errors on to the downstream.
6464
*/
65-
fun interface CompletingSharedFlowCache<K : Any, T : Any?> {
65+
fun interface CompletingSharedFlowCache<K : Any, T> {
6666

6767
/**
6868
* If the requested key [K] is not found in the cache when the [CompletingSharedFlow] returned by
@@ -73,20 +73,20 @@ fun interface CompletingSharedFlowCache<K : Any, T : Any?> {
7373

7474
companion object {
7575

76-
private interface MapActorAction<K : Any, T : Any?> {
76+
private interface MapActorAction<K : Any, T> {
7777

7878
val key: K
7979

80-
data class Fetch<K : Any, T : Any?>(
80+
data class Fetch<K : Any, T>(
8181
override val key: K,
8282
val supplier: (K) -> Flow<T>,
8383
val response: CompletableDeferred<SharedFlow<Any?>> = CompletableDeferred(),
8484
) : MapActorAction<K, T>
8585

86-
data class Reset<K : Any, T : Any?>(override val key: K) : MapActorAction<K, T>
86+
data class Reset<K : Any, T>(override val key: K) : MapActorAction<K, T>
8787
}
8888

89-
operator fun <K : Any, T : Any?> invoke(
89+
operator fun <K : Any, T> invoke(
9090
scope: CoroutineScope,
9191
started: SharingStarted,
9292
replay: Int,
@@ -175,7 +175,7 @@ fun <T> CompletingSharedFlow<T>.onSubscription(
175175
* Similar to [shareIn], but completions and errors are also propagated to the downstream
176176
* subscribers.
177177
*/
178-
fun <T : Any?> Flow<T>.shareInCompleting(
178+
fun <T> Flow<T>.shareInCompleting(
179179
scope: CoroutineScope,
180180
started: SharingStarted,
181181
replay: Int = 0,

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ fun Fory.registerDataSourceSerializers(): Fory = apply {
2525
registerSerializer(SimpleMapEvent::class.java, SimpleMapEventSerializer::class.java)
2626
registerSerializer(SetEvent::class.java, SetEventSerializer::class.java)
2727
registerSerializer(ValueOrCompletion::class.java, ValueOrCompletionSerializer::class.java)
28+
}
2829

30+
fun Fory.registerPersistentCollectionSerializers(): Fory = apply {
2931
// Register concrete serializers for the internal PersistentMap implementations
3032
runCatching {
3133
Class.forName(

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule.kt renamed to util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/DataSourceModule.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.caplin.integration.datasourcex.util.serialization.jackson
1+
package com.caplin.integration.datasourcex.util.serialization.jackson2
22

33
import com.caplin.integration.datasourcex.util.flow.FlowMapStreamEvent
44
import com.caplin.integration.datasourcex.util.flow.MapEvent

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/FlowMapStreamEventDeserializer.kt renamed to util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/FlowMapStreamEventDeserializer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.caplin.integration.datasourcex.util.serialization.jackson
1+
package com.caplin.integration.datasourcex.util.serialization.jackson2
22

33
import com.caplin.integration.datasourcex.util.flow.FlowMapStreamEvent
44
import com.caplin.integration.datasourcex.util.flow.MapEvent

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/FlowMapStreamEventSerializer.kt renamed to util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/FlowMapStreamEventSerializer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.caplin.integration.datasourcex.util.serialization.jackson
1+
package com.caplin.integration.datasourcex.util.serialization.jackson2
22

33
import com.caplin.integration.datasourcex.util.flow.FlowMapStreamEvent
44
import com.fasterxml.jackson.core.JsonGenerator

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/MapEventDeserializer.kt renamed to util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/MapEventDeserializer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.caplin.integration.datasourcex.util.serialization.jackson
1+
package com.caplin.integration.datasourcex.util.serialization.jackson2
22

33
import com.caplin.integration.datasourcex.util.flow.MapEvent
44
import com.fasterxml.jackson.core.JsonParser

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/MapEventSerializer.kt renamed to util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson2/MapEventSerializer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.caplin.integration.datasourcex.util.serialization.jackson
1+
package com.caplin.integration.datasourcex.util.serialization.jackson2
22

33
import com.caplin.integration.datasourcex.util.flow.MapEvent
44
import com.fasterxml.jackson.core.JsonGenerator

0 commit comments

Comments
 (0)