From 8fbb35730a40591c82fe6f458095331033abd48f Mon Sep 17 00:00:00 2001 From: Ahoo Wang Date: Sun, 7 Jun 2026 18:39:18 +0800 Subject: [PATCH 1/2] perf(benchmark): refine JMH benchmark suite --- wow-benchmarks/build.gradle.kts | 385 ++++---------- .../gradle/benchmark-reporting.gradle.kts | 477 ++++++++++++++++++ .../gradle/jmh-packaging.gradle.kts | 189 +++++++ .../BenchmarkAggregateSchedulerSupplier.kt | 38 ++ .../wow/BenchmarkGlobalIdGeneratorFactory.kt | 25 + .../BloomFilterIdempotencyCheckerBenchmark.kt | 2 +- .../kotlin/me/ahoo/wow/command/Commands.kt | 5 +- .../EventStreamFactoryBenchmark.kt | 8 +- .../me/ahoo/wow/eventsourcing/Events.kt | 9 +- .../InMemoryEventStoreBenchmark.kt | 18 +- .../eventsourcing/NoopEventStoreBenchmark.kt | 8 +- .../wow/eventsourcing/SnapshotBenchmark.kt | 13 +- .../wow/hotpath/CommandHandlingBenchmark.kt | 36 ++ .../CommandProcessingPipelineBenchmark.kt | 182 ++++++- .../ahoo/wow/hotpath/SnapshotSaveBenchmark.kt | 3 +- .../me/ahoo/wow/infra/DeepCopyBenchmark.kt | 14 +- .../kotlin/me/ahoo/wow/infra/SinkBenchmark.kt | 18 +- .../AbstractCommandDispatcherBenchmark.kt | 26 +- .../InMemoryCommandDispatcherBenchmark.kt | 24 +- ...nMemoryCommandDispatcherGrowthBenchmark.kt | 41 ++ .../NoopCommandDispatcherBenchmark.kt | 14 +- ...oopEventStoreCommandDispatcherBenchmark.kt | 12 +- .../mongo/MongoCommandDispatcherBenchmark.kt | 17 +- .../ahoo/wow/redis/RedisBenchmarkFixture.kt | 19 + .../redis/RedisCommandDispatcherBenchmark.kt | 15 +- ...RedisCommandProcessingPipelineBenchmark.kt | 165 ++++++ .../wow/redis/RedisEventStoreReadBenchmark.kt | 93 ++++ .../me.ahoo.wow.id.GlobalIdGeneratorFactory | 1 + 28 files changed, 1471 insertions(+), 386 deletions(-) create mode 100644 wow-benchmarks/gradle/benchmark-reporting.gradle.kts create mode 100644 wow-benchmarks/gradle/jmh-packaging.gradle.kts create mode 100644 wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkAggregateSchedulerSupplier.kt create mode 100644 wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkGlobalIdGeneratorFactory.kt create mode 100644 wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherGrowthBenchmark.kt create mode 100644 wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandProcessingPipelineBenchmark.kt create mode 100644 wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisEventStoreReadBenchmark.kt create mode 100644 wow-benchmarks/src/jmh/resources/META-INF/services/me.ahoo.wow.id.GlobalIdGeneratorFactory diff --git a/wow-benchmarks/build.gradle.kts b/wow-benchmarks/build.gradle.kts index dd8fa728d1f..c499382830b 100644 --- a/wow-benchmarks/build.gradle.kts +++ b/wow-benchmarks/build.gradle.kts @@ -1,5 +1,6 @@ -import java.time.LocalDate -import java.util.zip.ZipFile +import org.gradle.api.file.RegularFile +import org.gradle.api.provider.Provider +import org.gradle.api.tasks.JavaExec plugins { alias(libs.plugins.ksp) @@ -18,119 +19,7 @@ dependencies { kapt(libs.jmh.generator.annprocess) } -/** - * Merge all META-INF/wow-metadata.json from the runtime classpath into a single valid JSON. - * Without this, the JMH JAR contains duplicate entries that concatenate into invalid JSON, - * causing MetadataSearcher to fail silently at runtime. - * - * Aggregate entries are deep-merged (non-null values take precedence) to avoid the API module's - * type=null overwriting the domain module's type=full.class.Name. - */ -val mergedWowMetadata = layout.buildDirectory.file("tmp/wow-metadata-merged/META-INF/wow-metadata.json") -val mergeWowMetadata = tasks.register("mergeWowMetadata") { - description = "Merge all wow-metadata.json from classpath into a single file." - outputs.file(mergedWowMetadata) - inputs.files(configurations.jmhRuntimeClasspath) - doLast { - val mapper = com.fasterxml.jackson.databind.ObjectMapper() - val merged = mutableMapOf() - val metadataContents = mutableListOf() - - configurations.jmhRuntimeClasspath.get().resolve() - .filter { it.name.endsWith(".jar") || it.isDirectory } - .forEach { file -> - if (file.isDirectory) { - val meta = file.resolve("META-INF/wow-metadata.json") - if (meta.exists()) { - metadataContents.add(meta.readText()) - } - } else { - ZipFile(file).use { zip -> - zip.getEntry("META-INF/wow-metadata.json")?.let { entry -> - metadataContents.add(zip.getInputStream(entry).bufferedReader().readText()) - } - } - } - } - - for (text in metadataContents) { - try { - @Suppress("UNCHECKED_CAST") - val next = mapper.readValue(text, Map::class.java) as Map - @Suppress("UNCHECKED_CAST") - val contexts = next["contexts"] as? Map ?: continue - @Suppress("UNCHECKED_CAST") - val mergedContexts = merged.getOrPut("contexts") { mutableMapOf() } as MutableMap - for ((ctxName, ctxValue) in contexts) { - val existing = mergedContexts[ctxName] - if (existing == null) { - mergedContexts[ctxName] = ctxValue - } else { - @Suppress("UNCHECKED_CAST") - val existingMap = existing as MutableMap - @Suppress("UNCHECKED_CAST") - val newMap = ctxValue as Map - // Deep-merge aggregates (non-null values win) - @Suppress("UNCHECKED_CAST") - val existingAggregates = existingMap.getOrPut("aggregates") { mutableMapOf() } as MutableMap - @Suppress("UNCHECKED_CAST") - val newAggregates = newMap["aggregates"] as? Map ?: emptyMap() - for ((aggName, aggValue) in newAggregates) { - val existingAgg = existingAggregates[aggName] - if (existingAgg == null) { - existingAggregates[aggName] = aggValue - } else { - @Suppress("UNCHECKED_CAST") - val existingAggMap = existingAgg as MutableMap - @Suppress("UNCHECKED_CAST") - val newAggMap = aggValue as Map - for ((key, value) in newAggMap) { - if (value != null) { - val existingVal = existingAggMap[key] - if (existingVal == null || value is String) { - existingAggMap[key] = value - } else if (value is List<*>) { - @Suppress("UNCHECKED_CAST") - val existingList = (existingVal as? List ?: emptyList()).toMutableList() - @Suppress("UNCHECKED_CAST") - val newList = value as List - existingList.addAll(newList.filter { it !in existingList }) - existingAggMap[key] = existingList - } - } - } - } - } - // Merge scopes (union) - @Suppress("UNCHECKED_CAST") - val existingScopes = existingMap.getOrPut("scopes") { mutableListOf() } as MutableList - @Suppress("UNCHECKED_CAST") - val newScopes = newMap["scopes"] as? List ?: emptyList() - existingScopes.addAll(newScopes.filter { it !in existingScopes }) - } - } - } catch (_: Exception) { - // skip invalid metadata - } - } - - val outputFile = mergedWowMetadata.get().asFile - outputFile.parentFile.mkdirs() - mapper.writerWithDefaultPrettyPrinter().writeValue(outputFile, merged) - logger.lifecycle("Merged ${metadataContents.size} wow-metadata.json files into ${outputFile.absolutePath}") - } -} - -tasks.named("jmhJar") { - isZip64 = true - dependsOn(mergeWowMetadata) - // Exclude all wow-metadata.json from dependency JARs (they're duplicated) - exclude("META-INF/wow-metadata.json") - // Add the merged metadata file - from(mergedWowMetadata) { - into("META-INF") - } -} +apply(from = "gradle/jmh-packaging.gradle.kts") val benchmarkSmokeIncludes = listOf( "me.ahoo.wow.command.CommandFactoryBenchmark", @@ -172,6 +61,97 @@ tasks.register("benchmarkSmoke") { } } +val benchmarkInternalReport = layout.buildDirectory.file("results/jmh/internal.json") +val benchmarkExternalReport = layout.buildDirectory.file("results/jmh/external.json") +val benchmarkInternalHumanReport = layout.buildDirectory.file("reports/jmh/internal-human.txt") +val benchmarkExternalHumanReport = layout.buildDirectory.file("reports/jmh/external-human.txt") + +val benchmarkJvmArgs = listOf( + "-Xmx4g", + "-Xms4g", + "-XX:+UseG1GC", + "-XX:+UnlockDiagnosticVMOptions", + "-XX:+DebugNonSafepoints", + "-XX:+AlwaysPreTouch", +) + +fun benchmarkProfilerArgs(): List { + val asyncProfilerLib = file("/opt/async-profiler/lib/libasyncProfiler.dylib") + return buildList { + add("-prof") + add("gc") + add("-prof") + if (asyncProfilerLib.exists()) { + add("async:output=flamegraph;dir=build/profiling;event=cpu;libPath=${asyncProfilerLib.absolutePath}") + } else { + add("stack:lines=10;top=20") + } + } +} + +fun JavaExec.configureJmhBenchmarkRun( + includePattern: String, + resultsFile: Provider, + humanOutputFile: Provider, +) { + dependsOn(tasks.named("jmhJar")) + classpath(tasks.named("jmhJar").flatMap { it.archiveFile }) + mainClass.set("org.openjdk.jmh.Main") + args( + includePattern, + "-t", + "1", + "-wi", + "2", + "-w", + "5s", + "-i", + "3", + "-r", + "10s", + "-f", + "2", + "-foe", + "true", + "-rf", + "json", + "-rff", + resultsFile.get().asFile.absolutePath, + "-o", + humanOutputFile.get().asFile.absolutePath, + "-jvmArgs", + benchmarkJvmArgs.joinToString(" "), + ) + args(benchmarkProfilerArgs()) + outputs.file(resultsFile) + outputs.file(humanOutputFile) + outputs.upToDateWhen { false } + doFirst { + resultsFile.get().asFile.parentFile.mkdirs() + humanOutputFile.get().asFile.parentFile.mkdirs() + } +} + +tasks.register("benchmarkInternal") { + description = "Runs non-Mongo and non-Redis JMH benchmarks." + group = "benchmark" + configureJmhBenchmarkRun( + includePattern = """me\.ahoo\.wow\.(?!mongo\.|redis\.).*Benchmark.*""", + resultsFile = benchmarkInternalReport, + humanOutputFile = benchmarkInternalHumanReport, + ) +} + +tasks.register("benchmarkExternal") { + description = "Runs MongoDB and Redis JMH benchmarks." + group = "benchmark" + configureJmhBenchmarkRun( + includePattern = """me\.ahoo\.wow\.(mongo|redis)\..*Benchmark.*""", + resultsFile = benchmarkExternalReport, + humanOutputFile = benchmarkExternalHumanReport, + ) +} + jmh { zip64.set(true) includes.set(listOf(".*Benchmark.*")) @@ -206,177 +186,4 @@ jmh { }) } -val resultsDir = layout.projectDirectory.dir("results") -val baselineJson = resultsDir.file("baseline.json") -val latestJson = layout.buildDirectory.file("results/jmh/latest.json") -val readmeFile = layout.projectDirectory.file("README.md") - -tasks.register("generateBenchmarkReport") { - description = "Generate benchmark README.md from JMH JSON results." - group = "benchmark" - dependsOn(tasks.named("jmh")) - - doLast { - val resultsFile = latestJson.get().asFile - if (!resultsFile.exists()) { - throw GradleException("JMH results not found: ${resultsFile.absolutePath}. Run :wow-benchmarks:jmh first.") - } - - val mapper = com.fasterxml.jackson.databind.ObjectMapper() - val resultsText = resultsFile.readText() - @Suppress("UNCHECKED_CAST") - val jmhResults = mapper.readValue(resultsText, List::class.java) as List> - - val version = project.version.toString() - val date = LocalDate.now().toString() - val jvm = System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.version") - val os = System.getProperty("os.name") + " " + System.getProperty("os.arch") - - val sb = StringBuilder() - sb.appendLine("# Benchmark Report") - sb.appendLine() - sb.appendLine("## Environment") - sb.appendLine("- **Version**: $version") - sb.appendLine("- **JVM**: $jvm") - sb.appendLine("- **OS**: $os") - sb.appendLine("- **Date**: $date") - sb.appendLine("- **JMH Config**: threads=1, warmup=2×5s, measurement=3×10s, fork=2") - sb.appendLine() - sb.appendLine("## Results") - sb.appendLine() - sb.appendLine("| Benchmark | Score | Error | Unit | gc.alloc.rate.norm |") - sb.appendLine("|-----------|-------|-------|------|-------------------|") - - for (result in jmhResults) { - val benchmark = result["benchmark"] as? String ?: continue - @Suppress("UNCHECKED_CAST") - val primaryMetric = result["primaryMetric"] as? Map ?: continue - val score = primaryMetric["score"] as? Double ?: continue - val scoreError = primaryMetric["scoreError"] as? Double ?: 0.0 - val unit = primaryMetric["scoreUnit"] as? String ?: "ops/s" - - var allocRateNorm = "—" - @Suppress("UNCHECKED_CAST") - val secondaryMetrics = result["secondaryMetrics"] as? Map> - if (secondaryMetrics != null) { - val gcAlloc = secondaryMetrics["gc.alloc.rate.norm"] - allocRateNorm = String.format("%.1f B/op", gcAlloc?.get("score") as? Double ?: 0.0) - } - - val parts = benchmark.split(".") - val shortName = if (parts.size >= 2) "${parts[parts.size - 2]}.${parts.last()}" else benchmark - sb.appendLine("| $shortName | ${String.format("%.2f", score)} | ±${String.format("%.2f", scoreError)} | $unit | $allocRateNorm |") - } - - readmeFile.asFile.writeText(sb.toString()) - logger.lifecycle("Benchmark report generated: ${readmeFile.asFile.absolutePath}") - } -} - -tasks.register("benchmarkCompare") { - description = "Compare latest benchmark results against baseline." - group = "benchmark" - - doLast { - val latestFile = latestJson.get().asFile - val baselineFile = baselineJson.asFile - - if (!baselineFile.exists()) { - throw GradleException("Baseline not found: ${baselineFile.absolutePath}. Run :wow-benchmarks:updateBaseline first.") - } - if (!latestFile.exists()) { - throw GradleException("Latest results not found: ${latestFile.absolutePath}. Run :wow-benchmarks:jmh first.") - } - - val mapper = com.fasterxml.jackson.databind.ObjectMapper() - - fun parseScores(file: java.io.File): Map { - @Suppress("UNCHECKED_CAST") - val results = mapper.readValue(file, List::class.java) as List> - return results.associate { result -> - val benchmark = result["benchmark"] as String - @Suppress("UNCHECKED_CAST") - val params = result["params"] as? Map - val key = if (params != null && params.isNotEmpty()) { - "$benchmark(${params.entries.joinToString(",") { "${it.key}=${it.value}" }})" - } else { - benchmark - } - @Suppress("UNCHECKED_CAST") - val primaryMetric = result["primaryMetric"] as Map - key to (primaryMetric["score"] as Double) - } - } - - val baseline = parseScores(baselineFile) - val latest = parseScores(latestFile) - val allBenchmarks = (baseline.keys + latest.keys).sorted() - - var regressions = 0 - var improvements = 0 - - println() - println("## Benchmark Comparison") - println() - println("| Benchmark | Baseline | Current | Δ% | Status |") - println("|-----------|----------|---------|----|--------|") - - for (benchmark in allBenchmarks) { - val baseScore = baseline[benchmark] - val latestScore = latest[benchmark] - val parts = benchmark.split("(")[0].split(".") - val shortName = if (parts.size >= 2) "${parts[parts.size - 2]}.${parts.last()}" else benchmark - val paramSuffix = if ("(" in benchmark) " ${benchmark.substringAfter("(").substringBefore(")")}" else "" - - val displayName = "$shortName$paramSuffix" - - if (baseScore == null) { - println("| $displayName | — | ${String.format("%.2f", latestScore)} | NEW | 🆕 |") - continue - } - if (latestScore == null) { - println("| $displayName | ${String.format("%.2f", baseScore)} | — | REMOVED | ⚠️ |") - continue - } - - val changePercent = ((latestScore - baseScore) / baseScore) * 100 - val status = when { - changePercent < -10.0 -> { - regressions++ - "🔴 REGRESSION" - } - changePercent > 10.0 -> { - improvements++ - "🟢 IMPROVED" - } - else -> "✅" - } - - println("| $displayName | ${String.format("%.2f", baseScore)} | ${String.format("%.2f", latestScore)} | ${String.format("%+.1f%%", changePercent)} | $status |") - } - - println() - println("**Summary:** $regressions regression(s), $improvements improvement(s), ${allBenchmarks.size - regressions - improvements} stable") - - if (regressions > 0) { - throw GradleException("Benchmark regressions detected: $regressions") - } - } -} - -tasks.register("updateBaseline") { - description = "Copy latest benchmark results as the new baseline." - group = "benchmark" - - doLast { - val latestFile = latestJson.get().asFile - val baselineFile = baselineJson.asFile - - if (!latestFile.exists()) { - throw GradleException("Latest results not found: ${latestFile.absolutePath}. Run :wow-benchmarks:jmh first.") - } - - latestFile.copyTo(baselineFile, overwrite = true) - logger.lifecycle("Baseline updated: ${baselineFile.absolutePath}") - } -} +apply(from = "gradle/benchmark-reporting.gradle.kts") diff --git a/wow-benchmarks/gradle/benchmark-reporting.gradle.kts b/wow-benchmarks/gradle/benchmark-reporting.gradle.kts new file mode 100644 index 00000000000..5489523d22a --- /dev/null +++ b/wow-benchmarks/gradle/benchmark-reporting.gradle.kts @@ -0,0 +1,477 @@ +import groovy.json.JsonSlurper +import org.gradle.api.file.RegularFile +import org.gradle.api.provider.Provider +import java.time.Instant +import java.time.LocalDate +import java.util.Locale + +val resultsDir = layout.projectDirectory.dir("results") +val baselineJson = resultsDir.file("baseline.json") +val latestJson = layout.buildDirectory.file("results/jmh/latest.json") +val readmeFile = layout.projectDirectory.file("README.md") + +val benchmarkInternalReport = layout.buildDirectory.file("results/jmh/internal.json") +val benchmarkExternalReport = layout.buildDirectory.file("results/jmh/external.json") + +data class BenchmarkResultGroup( + val name: String, + val command: String, + val resultFile: Provider, + val required: Boolean = true, +) + +data class BenchmarkGroupReport( + val group: BenchmarkResultGroup, + val rows: List, + val sourceRowCount: Int = rows.size, + val unavailableReason: String? = null, +) + +data class ParsedBenchmarkResult( + val group: String, + val benchmark: String, + val displayName: String, + val score: Double, + val scoreError: Double?, + val unit: String, + val allocationBytesPerOp: Double?, + val allocationErrorBytesPerOp: Double?, +) + +fun shortBenchmarkName(benchmark: String): String { + val parts = benchmark.split(".") + return if (parts.size >= 2) { + "${parts[parts.size - 2]}.${parts.last()}" + } else { + benchmark + } +} + +fun benchmarkDisplayName(result: Map<*, *>): String { + val benchmark = result["benchmark"] as String + @Suppress("UNCHECKED_CAST") + val params = result["params"] as? Map<*, *> + if (params.isNullOrEmpty()) { + return shortBenchmarkName(benchmark) + } + val paramText = params.entries.sortedBy { it.key.toString() } + .joinToString(", ") { "${it.key}=${it.value}" } + return "${shortBenchmarkName(benchmark)} ($paramText)" +} + +fun parseMetricNumber(value: Any?): Double? { + val parsed = when (value) { + is Number -> value.toDouble() + is String -> value.toDoubleOrNull() + else -> null + } ?: return null + return parsed.takeIf { it.isFinite() } +} + +fun benchmarkResultRowException( + group: BenchmarkResultGroup, + resultFile: java.io.File, + rowIndex: Int, + message: String, +): GradleException { + return GradleException( + "Invalid JMH result row for ${group.name} at index $rowIndex in ${resultFile.absolutePath}: $message" + ) +} + +fun parseBenchmarkGroup( + parser: JsonSlurper, + group: BenchmarkResultGroup, +): BenchmarkGroupReport { + val resultFile = group.resultFile.get().asFile + if (!resultFile.exists()) { + if (!group.required) { + return BenchmarkGroupReport( + group = group, + rows = emptyList(), + unavailableReason = "Result file was not present. Run ${group.command} when the required service is available.", + ) + } + throw GradleException( + "JMH results not found for ${group.name}: ${resultFile.absolutePath}. Run ${group.command} first." + ) + } + val resultsText = resultFile.readText() + if (resultsText.isBlank()) { + throw GradleException("JMH results are empty for ${group.name}: ${resultFile.absolutePath}") + } + @Suppress("UNCHECKED_CAST") + val results = parser.parseText(resultsText) as List<*> + if (results.isEmpty()) { + throw GradleException("JMH results contain no benchmarks for ${group.name}: ${resultFile.absolutePath}") + } + val rows = results.mapIndexed { rowIndex, rawResult -> + val result = rawResult as? Map<*, *> ?: throw benchmarkResultRowException( + group = group, + resultFile = resultFile, + rowIndex = rowIndex, + message = "expected row to be a JSON object.", + ) + val benchmark = result["benchmark"] as? String ?: throw benchmarkResultRowException( + group = group, + resultFile = resultFile, + rowIndex = rowIndex, + message = "missing benchmark.", + ) + val primaryMetric = result["primaryMetric"] as? Map<*, *> ?: throw benchmarkResultRowException( + group = group, + resultFile = resultFile, + rowIndex = rowIndex, + message = "missing primaryMetric.", + ) + val score = parseMetricNumber(primaryMetric["score"]) ?: throw benchmarkResultRowException( + group = group, + resultFile = resultFile, + rowIndex = rowIndex, + message = "missing or unusable primaryMetric.score.", + ) + val scoreError = parseMetricNumber(primaryMetric["scoreError"]) + val unit = primaryMetric["scoreUnit"] as? String ?: "ops/s" + val secondaryMetrics = result["secondaryMetrics"] as? Map<*, *> + val allocationMetric = secondaryMetrics?.get("gc.alloc.rate.norm") as? Map<*, *> + val allocationBytesPerOp = parseMetricNumber(allocationMetric?.get("score")) + val allocationErrorBytesPerOp = parseMetricNumber(allocationMetric?.get("scoreError")) + ParsedBenchmarkResult( + group = group.name, + benchmark = benchmark, + displayName = benchmarkDisplayName(result), + score = score, + scoreError = scoreError, + unit = unit, + allocationBytesPerOp = allocationBytesPerOp, + allocationErrorBytesPerOp = allocationErrorBytesPerOp, + ) + } + return BenchmarkGroupReport(group = group, rows = rows, sourceRowCount = results.size) +} + +fun formatScoreError(scoreError: Double?): String { + return scoreError?.let { "+/-${String.format(Locale.US, "%.2f", it)}" } ?: "-" +} + +fun formatAllocationBytes(allocationBytesPerOp: Double?): String { + return allocationBytesPerOp?.let { String.format(Locale.US, "%.1f B/op", it) } ?: "-" +} + +fun formatAllocationError(allocationErrorBytesPerOp: Double?): String { + return allocationErrorBytesPerOp?.let { "+/-${String.format(Locale.US, "%.1f B/op", it)}" } ?: "-" +} + +fun StringBuilder.appendBenchmarkTable(rows: List) { + appendLine("| Benchmark | Score | Error | Unit | gc.alloc.rate.norm |") + appendLine("|-----------|-------|-------|------|-------------------|") + rows.sortedBy { it.displayName }.forEach { row -> + appendLine( + "| ${row.displayName} | ${String.format(Locale.US, "%.2f", row.score)} | " + + "${formatScoreError(row.scoreError)} | ${row.unit} | ${formatAllocationBytes(row.allocationBytesPerOp)} |" + ) + } +} + +fun StringBuilder.appendThroughputBottlenecks(rows: List) { + appendLine("| Benchmark | Score | Error | Unit |") + appendLine("|-----------|-------|-------|------|") + rows.filter { it.unit.contains("ops", ignoreCase = true) } + .sortedBy { it.score } + .take(10) + .forEach { row -> + appendLine( + "| ${row.group}: ${row.displayName} | ${String.format(Locale.US, "%.2f", row.score)} | " + + "${formatScoreError(row.scoreError)} | ${row.unit} |" + ) + } +} + +fun StringBuilder.appendAllocationBottlenecks(rows: List) { + appendLine("| Benchmark | Allocation | Error | Score | Unit |") + appendLine("|-----------|------------|-------|-------|------|") + rows.filter { it.allocationBytesPerOp != null } + .sortedByDescending { it.allocationBytesPerOp } + .take(10) + .forEach { row -> + appendLine( + "| ${row.group}: ${row.displayName} | " + + "${formatAllocationBytes(row.allocationBytesPerOp)} | " + + "${formatAllocationError(row.allocationErrorBytesPerOp)} | " + + "${String.format(Locale.US, "%.2f", row.score)} | ${row.unit} |" + ) + } +} + +fun renderGroupedBenchmarkReport( + groups: List, + version: String, +): String { + val parser = JsonSlurper() + val parsedGroups = groups.map { parseBenchmarkGroup(parser, it) } + val allRows = parsedGroups.flatMap { it.rows } + if (allRows.isEmpty()) { + throw GradleException("No benchmark rows were available for grouped report generation.") + } + val sb = StringBuilder() + sb.appendLine("# Grouped Benchmark Report") + sb.appendLine() + sb.appendLine("## Environment") + sb.appendLine("- **Version**: $version") + sb.appendLine("- **JVM**: ${System.getProperty("java.vm.name")} ${System.getProperty("java.vm.version")}") + sb.appendLine("- **OS**: ${System.getProperty("os.name")} ${System.getProperty("os.arch")}") + sb.appendLine("- **Date**: ${LocalDate.now()}") + sb.appendLine("- **JMH Config**: threads=1, warmup=2x5s, measurement=3x10s, fork=2") + sb.appendLine() + sb.appendLine("## Bottlenecks") + sb.appendLine() + sb.appendLine("### Lowest Throughput") + sb.appendLine() + sb.appendThroughputBottlenecks(allRows) + sb.appendLine() + sb.appendLine("### Highest Allocation") + sb.appendLine() + sb.appendAllocationBottlenecks(allRows) + sb.appendLine() + parsedGroups.filter { it.rows.isNotEmpty() }.forEach { groupReport -> + sb.appendLine("### ${groupReport.group.name} Lowest Throughput") + sb.appendLine() + sb.appendThroughputBottlenecks(groupReport.rows) + sb.appendLine() + sb.appendLine("### ${groupReport.group.name} Highest Allocation") + sb.appendLine() + sb.appendAllocationBottlenecks(groupReport.rows) + sb.appendLine() + } + parsedGroups.forEach { groupReport -> + val group = groupReport.group + val rows = groupReport.rows + val resultFile = group.resultFile.get().asFile + sb.appendLine("## ${group.name} Results") + sb.appendLine() + sb.appendLine("- **Command**: `${group.command}`") + sb.appendLine("- **Result File**: `${resultFile.absolutePath}`") + if (resultFile.exists()) { + sb.appendLine("- **Last Modified**: ${Instant.ofEpochMilli(resultFile.lastModified())}") + } + sb.appendLine("- **Source Row Count**: ${groupReport.sourceRowCount}") + sb.appendLine("- **Parsed Row Count**: ${rows.size}") + sb.appendLine() + if (groupReport.unavailableReason != null) { + sb.appendLine(groupReport.unavailableReason) + } else { + sb.appendBenchmarkTable(rows) + } + sb.appendLine() + } + return sb.toString() +} + +tasks.register("generateBenchmarkReport") { + description = "Generate benchmark README.md from JMH JSON results." + group = "benchmark" + dependsOn(tasks.named("jmh")) + + doLast { + val resultsFile = latestJson.get().asFile + if (!resultsFile.exists()) { + throw GradleException("JMH results not found: ${resultsFile.absolutePath}. Run :wow-benchmarks:jmh first.") + } + + val parser = JsonSlurper() + val resultsText = resultsFile.readText() + @Suppress("UNCHECKED_CAST") + val jmhResults = parser.parseText(resultsText) as List> + + val version = project.version.toString() + val date = LocalDate.now().toString() + val jvm = System.getProperty("java.vm.name") + " " + System.getProperty("java.vm.version") + val os = System.getProperty("os.name") + " " + System.getProperty("os.arch") + + val sb = StringBuilder() + sb.appendLine("# Benchmark Report") + sb.appendLine() + sb.appendLine("## Environment") + sb.appendLine("- **Version**: $version") + sb.appendLine("- **JVM**: $jvm") + sb.appendLine("- **OS**: $os") + sb.appendLine("- **Date**: $date") + sb.appendLine("- **JMH Config**: threads=1, warmup=2×5s, measurement=3×10s, fork=2") + sb.appendLine() + sb.appendLine("## Results") + sb.appendLine() + sb.appendLine("| Benchmark | Score | Error | Unit | gc.alloc.rate.norm |") + sb.appendLine("|-----------|-------|-------|------|-------------------|") + + for (result in jmhResults) { + val benchmark = result["benchmark"] as? String ?: continue + @Suppress("UNCHECKED_CAST") + val primaryMetric = result["primaryMetric"] as? Map ?: continue + val score = primaryMetric["score"] as? Double ?: continue + val scoreError = primaryMetric["scoreError"] as? Double ?: 0.0 + val unit = primaryMetric["scoreUnit"] as? String ?: "ops/s" + + var allocRateNorm = "—" + @Suppress("UNCHECKED_CAST") + val secondaryMetrics = result["secondaryMetrics"] as? Map> + if (secondaryMetrics != null) { + val gcAlloc = secondaryMetrics["gc.alloc.rate.norm"] + allocRateNorm = String.format(Locale.US, "%.1f B/op", gcAlloc?.get("score") as? Double ?: 0.0) + } + + val parts = benchmark.split(".") + val shortName = if (parts.size >= 2) "${parts[parts.size - 2]}.${parts.last()}" else benchmark + sb.appendLine( + "| $shortName | ${String.format(Locale.US, "%.2f", score)} | " + + "±${String.format(Locale.US, "%.2f", scoreError)} | $unit | $allocRateNorm |" + ) + } + + readmeFile.asFile.writeText(sb.toString()) + logger.lifecycle("Benchmark report generated: ${readmeFile.asFile.absolutePath}") + } +} + +val groupedBenchmarkReport = layout.buildDirectory.file("reports/jmh/grouped.md") + +tasks.register("generateGroupedBenchmarkReport") { + description = "Generate a grouped benchmark report from internal and external JMH JSON results." + group = "benchmark" + outputs.file(groupedBenchmarkReport) + outputs.upToDateWhen { false } + doLast { + val outputFile = groupedBenchmarkReport.get().asFile + outputFile.delete() + val report = renderGroupedBenchmarkReport( + groups = listOf( + BenchmarkResultGroup( + name = "Internal", + command = "./gradlew :wow-benchmarks:benchmarkInternal", + resultFile = benchmarkInternalReport, + ), + BenchmarkResultGroup( + name = "External", + command = "./gradlew :wow-benchmarks:benchmarkExternal", + resultFile = benchmarkExternalReport, + required = false, + ), + ), + version = project.version.toString(), + ) + outputFile.parentFile.mkdirs() + outputFile.writeText(report) + logger.lifecycle("Grouped benchmark report generated: ${outputFile.absolutePath}") + } +} + +tasks.register("benchmarkCompare") { + description = "Compare latest benchmark results against baseline." + group = "benchmark" + + doLast { + val latestFile = latestJson.get().asFile + val baselineFile = baselineJson.asFile + + if (!baselineFile.exists()) { + throw GradleException("Baseline not found: ${baselineFile.absolutePath}. Run :wow-benchmarks:updateBaseline first.") + } + if (!latestFile.exists()) { + throw GradleException("Latest results not found: ${latestFile.absolutePath}. Run :wow-benchmarks:jmh first.") + } + + val parser = JsonSlurper() + + fun parseScores(file: java.io.File): Map { + @Suppress("UNCHECKED_CAST") + val results = parser.parse(file) as List> + return results.associate { result -> + val benchmark = result["benchmark"] as String + @Suppress("UNCHECKED_CAST") + val params = result["params"] as? Map + val key = if (params != null && params.isNotEmpty()) { + "$benchmark(${params.entries.joinToString(",") { "${it.key}=${it.value}" }})" + } else { + benchmark + } + @Suppress("UNCHECKED_CAST") + val primaryMetric = result["primaryMetric"] as Map + key to (primaryMetric["score"] as Double) + } + } + + val baseline = parseScores(baselineFile) + val latest = parseScores(latestFile) + val allBenchmarks = (baseline.keys + latest.keys).sorted() + + var regressions = 0 + var improvements = 0 + + println() + println("## Benchmark Comparison") + println() + println("| Benchmark | Baseline | Current | Δ% | Status |") + println("|-----------|----------|---------|----|--------|") + + for (benchmark in allBenchmarks) { + val baseScore = baseline[benchmark] + val latestScore = latest[benchmark] + val parts = benchmark.split("(")[0].split(".") + val shortName = if (parts.size >= 2) "${parts[parts.size - 2]}.${parts.last()}" else benchmark + val paramSuffix = if ("(" in benchmark) " ${benchmark.substringAfter("(").substringBefore(")")}" else "" + + val displayName = "$shortName$paramSuffix" + + if (baseScore == null) { + println("| $displayName | — | ${String.format(Locale.US, "%.2f", latestScore)} | NEW | 🆕 |") + continue + } + if (latestScore == null) { + println("| $displayName | ${String.format(Locale.US, "%.2f", baseScore)} | — | REMOVED | ⚠️ |") + continue + } + + val changePercent = ((latestScore - baseScore) / baseScore) * 100 + val status = when { + changePercent < -10.0 -> { + regressions++ + "🔴 REGRESSION" + } + changePercent > 10.0 -> { + improvements++ + "🟢 IMPROVED" + } + else -> "✅" + } + + println( + "| $displayName | ${String.format(Locale.US, "%.2f", baseScore)} | " + + "${String.format(Locale.US, "%.2f", latestScore)} | " + + "${String.format(Locale.US, "%+.1f%%", changePercent)} | $status |" + ) + } + + println() + println("**Summary:** $regressions regression(s), $improvements improvement(s), ${allBenchmarks.size - regressions - improvements} stable") + + if (regressions > 0) { + throw GradleException("Benchmark regressions detected: $regressions") + } + } +} + +tasks.register("updateBaseline") { + description = "Copy latest benchmark results as the new baseline." + group = "benchmark" + + doLast { + val latestFile = latestJson.get().asFile + val baselineFile = baselineJson.asFile + + if (!latestFile.exists()) { + throw GradleException("Latest results not found: ${latestFile.absolutePath}. Run :wow-benchmarks:jmh first.") + } + + latestFile.copyTo(baselineFile, overwrite = true) + logger.lifecycle("Baseline updated: ${baselineFile.absolutePath}") + } +} diff --git a/wow-benchmarks/gradle/jmh-packaging.gradle.kts b/wow-benchmarks/gradle/jmh-packaging.gradle.kts new file mode 100644 index 00000000000..69b1710cf27 --- /dev/null +++ b/wow-benchmarks/gradle/jmh-packaging.gradle.kts @@ -0,0 +1,189 @@ +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import java.util.zip.ZipFile + +val jmhRuntimeClasspath = configurations.named("jmhRuntimeClasspath") + +/** + * Merge all META-INF/wow-metadata.json from the runtime classpath into a single valid JSON. + * Without this, the JMH JAR contains duplicate entries that concatenate into invalid JSON, + * causing MetadataSearcher to fail silently at runtime. + * + * Aggregate entries are deep-merged (non-null values take precedence) to avoid the API module's + * type=null overwriting the domain module's type=full.class.Name. + */ +val mergedWowMetadata = layout.buildDirectory.file("tmp/wow-metadata-merged/META-INF/wow-metadata.json") +val mergeWowMetadata = tasks.register("mergeWowMetadata") { + description = "Merge all wow-metadata.json from classpath into a single file." + outputs.file(mergedWowMetadata) + inputs.files(jmhRuntimeClasspath) + doLast { + val parser = JsonSlurper() + val merged = mutableMapOf() + val metadataContents = mutableListOf() + + fun deepMutable(value: Any?): Any? = when (value) { + is Map<*, *> -> value.entries.associate { entry -> + entry.key.toString() to deepMutable(entry.value) + }.toMutableMap() + + is List<*> -> value.map { deepMutable(it) }.toMutableList() + else -> value + } + + jmhRuntimeClasspath.get().resolve() + .filter { it.name.endsWith(".jar") || it.isDirectory } + .forEach { file -> + if (file.isDirectory) { + val meta = file.resolve("META-INF/wow-metadata.json") + if (meta.exists()) { + metadataContents.add(meta.readText()) + } + } else { + ZipFile(file).use { zip -> + zip.getEntry("META-INF/wow-metadata.json")?.let { entry -> + metadataContents.add(zip.getInputStream(entry).bufferedReader().readText()) + } + } + } + } + + for (text in metadataContents) { + try { + @Suppress("UNCHECKED_CAST") + val next = parser.parseText(text) as Map + @Suppress("UNCHECKED_CAST") + val contexts = next["contexts"] as? Map ?: continue + @Suppress("UNCHECKED_CAST") + val mergedContexts = merged.getOrPut("contexts") { mutableMapOf() } as MutableMap + for ((ctxName, ctxValue) in contexts) { + val existing = mergedContexts[ctxName] + if (existing == null) { + mergedContexts[ctxName] = deepMutable(ctxValue)!! + } else { + @Suppress("UNCHECKED_CAST") + val existingMap = existing as MutableMap + @Suppress("UNCHECKED_CAST") + val newMap = ctxValue as Map + // Deep-merge aggregates (non-null values win) + @Suppress("UNCHECKED_CAST") + val existingAggregates = existingMap.getOrPut("aggregates") { mutableMapOf() } as MutableMap + @Suppress("UNCHECKED_CAST") + val newAggregates = newMap["aggregates"] as? Map ?: emptyMap() + for ((aggName, aggValue) in newAggregates) { + val existingAgg = existingAggregates[aggName] + if (existingAgg == null) { + existingAggregates[aggName] = deepMutable(aggValue)!! + } else { + @Suppress("UNCHECKED_CAST") + val existingAggMap = existingAgg as MutableMap + @Suppress("UNCHECKED_CAST") + val newAggMap = aggValue as Map + for ((key, value) in newAggMap) { + if (value != null) { + val existingVal = existingAggMap[key] + if (existingVal == null || value is String) { + existingAggMap[key] = value + } else if (value is List<*>) { + @Suppress("UNCHECKED_CAST") + val existingList = (existingVal as? List ?: emptyList()).toMutableList() + @Suppress("UNCHECKED_CAST") + val newList = value as List + existingList.addAll(newList.filter { it !in existingList }) + existingAggMap[key] = existingList + } + } + } + } + } + // Merge scopes (union) + @Suppress("UNCHECKED_CAST") + val existingScopes = existingMap.getOrPut("scopes") { mutableListOf() } as MutableList + @Suppress("UNCHECKED_CAST") + val newScopes = newMap["scopes"] as? List ?: emptyList() + existingScopes.addAll(newScopes.filter { it !in existingScopes }) + } + } + } catch (_: Exception) { + // skip invalid metadata + } + } + + val outputFile = mergedWowMetadata.get().asFile + outputFile.parentFile.mkdirs() + outputFile.writeText(JsonOutput.prettyPrint(JsonOutput.toJson(merged))) + logger.lifecycle("Merged ${metadataContents.size} wow-metadata.json files into ${outputFile.absolutePath}") + } +} + +val mergedJmhServicesRoot = layout.buildDirectory.dir("tmp/jmh-services-merged") +val jmhServiceFilesToMerge = listOf( + "META-INF/services/tools.jackson.databind.JacksonModule", + "META-INF/services/me.ahoo.wow.id.GlobalIdGeneratorFactory", +) +val mergeJmhServices = tasks.register("mergeJmhServices") { + description = "Merge critical SPI service files from the JMH runtime classpath." + outputs.dir(mergedJmhServicesRoot) + inputs.files(jmhRuntimeClasspath) + inputs.files(jmhServiceFilesToMerge.map { layout.projectDirectory.file("src/jmh/resources/$it") }) + doLast { + val outputRoot = mergedJmhServicesRoot.get().asFile + outputRoot.deleteRecursively() + + fun MutableSet.addServiceProviders(text: String) { + text.lineSequence() + .map { it.substringBefore('#').trim() } + .filter { it.isNotEmpty() } + .forEach { add(it) } + } + + for (servicePath in jmhServiceFilesToMerge) { + val providers = linkedSetOf() + jmhRuntimeClasspath.get().resolve() + .filter { it.name.endsWith(".jar") || it.isDirectory } + .forEach { file -> + if (file.isDirectory) { + val serviceFile = file.resolve(servicePath) + if (serviceFile.exists()) { + providers.addServiceProviders(serviceFile.readText()) + } + } else { + ZipFile(file).use { zip -> + zip.getEntry(servicePath)?.let { entry -> + providers.addServiceProviders(zip.getInputStream(entry).bufferedReader().readText()) + } + } + } + } + + val localServiceFile = file("src/jmh/resources/$servicePath") + if (localServiceFile.exists()) { + providers.addServiceProviders(localServiceFile.readText()) + } + + if (providers.isNotEmpty()) { + val outputFile = outputRoot.resolve(servicePath) + outputFile.parentFile.mkdirs() + outputFile.writeText(providers.joinToString(System.lineSeparator(), postfix = System.lineSeparator())) + } + } + } +} + +tasks.named("jmhJar") { + isZip64 = true + dependsOn(mergeWowMetadata) + dependsOn(mergeJmhServices) + // Exclude all wow-metadata.json from dependency JARs (they're duplicated) + exclude("META-INF/wow-metadata.json") + // Add the merged metadata file + from(mergedWowMetadata) { + into("META-INF") + } + eachFile { + if (path in jmhServiceFilesToMerge && !file.absolutePath.startsWith(mergedJmhServicesRoot.get().asFile.absolutePath)) { + exclude() + } + } + from(mergedJmhServicesRoot) +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkAggregateSchedulerSupplier.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkAggregateSchedulerSupplier.kt new file mode 100644 index 00000000000..a18a786ea42 --- /dev/null +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkAggregateSchedulerSupplier.kt @@ -0,0 +1,38 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.wow + +import me.ahoo.wow.api.modeling.NamedAggregate +import me.ahoo.wow.modeling.MaterializedNamedAggregate +import me.ahoo.wow.modeling.materialize +import me.ahoo.wow.scheduler.AggregateSchedulerSupplier +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import reactor.core.scheduler.Schedulers +import java.util.concurrent.ConcurrentHashMap + +class BenchmarkAggregateSchedulerSupplier : AggregateSchedulerSupplier { + private val schedulers: MutableMap = ConcurrentHashMap() + + override fun getOrInitialize(namedAggregate: NamedAggregate): Scheduler = + schedulers.computeIfAbsent(namedAggregate.materialize()) { + Schedulers.newParallel("CommandDispatcherBenchmark-${it.aggregateName}", Schedulers.DEFAULT_POOL_SIZE) + } + + override fun stopGracefully(): Mono = + Mono.fromRunnable { + schedulers.values.forEach(Scheduler::dispose) + schedulers.clear() + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkGlobalIdGeneratorFactory.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkGlobalIdGeneratorFactory.kt new file mode 100644 index 00000000000..3c57289c7ce --- /dev/null +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/BenchmarkGlobalIdGeneratorFactory.kt @@ -0,0 +1,25 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.wow + +import me.ahoo.cosid.cosid.ClockSyncCosIdGenerator +import me.ahoo.cosid.cosid.CosIdGenerator +import me.ahoo.cosid.cosid.Radix62CosIdGenerator +import me.ahoo.wow.id.GlobalIdGeneratorFactory + +class BenchmarkGlobalIdGeneratorFactory : GlobalIdGeneratorFactory { + override fun create(): CosIdGenerator { + return ClockSyncCosIdGenerator(Radix62CosIdGenerator(0)) + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/BloomFilterIdempotencyCheckerBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/BloomFilterIdempotencyCheckerBenchmark.kt index 8bd40d5ba2a..5160a64c8d0 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/BloomFilterIdempotencyCheckerBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/BloomFilterIdempotencyCheckerBenchmark.kt @@ -35,4 +35,4 @@ open class BloomFilterIdempotencyCheckerBenchmark { val result = idempotencyChecker.check(generateGlobalId()).block() blackhole.consume(result) } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/Commands.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/Commands.kt index 7252fd5f7ce..09cec58c002 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/Commands.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/command/Commands.kt @@ -19,18 +19,17 @@ import me.ahoo.wow.api.command.CommandMessage import me.ahoo.wow.example.api.cart.AddCartItem import me.ahoo.wow.example.domain.cart.Cart import me.ahoo.wow.example.domain.cart.CartState +import me.ahoo.wow.id.generateGlobalId import me.ahoo.wow.infra.idempotency.BloomFilterIdempotencyChecker import me.ahoo.wow.modeling.MaterializedNamedAggregate import me.ahoo.wow.modeling.annotation.aggregateMetadata import java.time.Duration -import java.util.concurrent.atomic.AtomicLong val cartAggregateMetadata by lazy { aggregateMetadata() } private val benchmarkCart = MaterializedNamedAggregate("example-service", "cart") -private val benchmarkIdSequence = AtomicLong() const val FIXED_AGGREGATE_ID = "benchmark-cart-fixed-id" fun createCommandMessage(): CommandMessage { @@ -78,7 +77,7 @@ private fun createCommandMessage( ) } -private fun nextBenchmarkId(): String = "benchmark-${benchmarkIdSequence.incrementAndGet()}" +private fun nextBenchmarkId(): String = generateGlobalId() fun createBloomFilterIdempotencyChecker(): BloomFilterIdempotencyChecker { return BloomFilterIdempotencyChecker(Duration.ofMinutes(1)) { diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/EventStreamFactoryBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/EventStreamFactoryBenchmark.kt index 14c76254142..c4c22e6e3d3 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/EventStreamFactoryBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/EventStreamFactoryBenchmark.kt @@ -26,4 +26,10 @@ open class EventStreamFactoryBenchmark { val eventStream = createEventStream() blackhole.consume(eventStream) } -} \ No newline at end of file + + @Benchmark + fun createSingleEventStream(blackhole: Blackhole) { + val eventStream = createSingleEventStream() + blackhole.consume(eventStream) + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt index b15caebe90f..e59acd0c854 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt @@ -26,4 +26,11 @@ fun createEventStream(): DomainEventStream { return listOf(event).toDomainEventStream( upstream = GivenInitializationCommand(cartAggregateMetadata.aggregateId()), ) -} \ No newline at end of file +} + +fun createSingleEventStream(): DomainEventStream { + val event = CartItemAdded(CartItem("productId")) + return event.toDomainEventStream( + upstream = GivenInitializationCommand(cartAggregateMetadata.aggregateId()), + ) +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/InMemoryEventStoreBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/InMemoryEventStoreBenchmark.kt index 5047760a2d7..15cecb974ea 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/InMemoryEventStoreBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/InMemoryEventStoreBenchmark.kt @@ -14,30 +14,32 @@ package me.ahoo.wow.eventsourcing import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.OperationsPerInvocation import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.Setup import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.TearDown @State(Scope.Benchmark) open class InMemoryEventStoreBenchmark : AbstractEventStoreBenchmark() { + private companion object { + const val APPENDS_PER_INVOCATION = 1024 + } @Setup override fun setup() { super.setup() } - @TearDown - fun tearDown() { - setup() - } - override fun createEventStore(): EventStore { return InMemoryEventStore() } @Benchmark + @OperationsPerInvocation(APPENDS_PER_INVOCATION) override fun append() { - super.append() + repeat(APPENDS_PER_INVOCATION) { + super.append() + } + setup() } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/NoopEventStoreBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/NoopEventStoreBenchmark.kt index 345546abd28..06c0c3f8227 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/NoopEventStoreBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/NoopEventStoreBenchmark.kt @@ -15,10 +15,16 @@ package me.ahoo.wow.eventsourcing import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup import org.openjdk.jmh.annotations.State @State(Scope.Benchmark) open class NoopEventStoreBenchmark : AbstractEventStoreBenchmark() { + @Setup + override fun setup() { + super.setup() + } + override fun createEventStore(): EventStore { return NoopEventStore } @@ -28,4 +34,4 @@ open class NoopEventStoreBenchmark : AbstractEventStoreBenchmark() { override fun append() { super.append() } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt index 78379faa52c..d037e61a301 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt @@ -30,7 +30,7 @@ import org.openjdk.jmh.infra.Blackhole @State(Scope.Benchmark) open class SnapshotBenchmark { - private lateinit var snapshotRepository: SnapshotRepository + private lateinit var snapshotLoadRepository: SnapshotRepository private lateinit var snapshotStrategy: VersionOffsetSnapshotStrategy private lateinit var stateEventExchange: SimpleStateEventExchange<*> private lateinit var aggregateId: me.ahoo.wow.api.modeling.AggregateId @@ -38,21 +38,20 @@ open class SnapshotBenchmark { @Setup fun setup() { aggregateId = cartAggregateMetadata.aggregateId() - snapshotRepository = InMemorySnapshotRepository() + snapshotLoadRepository = InMemorySnapshotRepository() snapshotStrategy = VersionOffsetSnapshotStrategy( versionOffset = 5, - snapshotRepository = snapshotRepository, + snapshotRepository = InMemorySnapshotRepository(), ) val aggregate = ConstructorStateAggregateFactory.create( cartAggregateMetadata.state, aggregateId, ) - val snapshot = SimpleSnapshot(aggregate) - snapshotRepository.save(snapshot).block() - val eventStream = createEventStream() val stateEvent = eventStream.toStateEvent(aggregate) + val snapshot = SimpleSnapshot(stateEvent) + snapshotLoadRepository.save(snapshot).block() stateEventExchange = SimpleStateEventExchange(stateEvent) } @@ -64,7 +63,7 @@ open class SnapshotBenchmark { @Benchmark fun snapshotLoad(blackhole: Blackhole) { - val snapshot = snapshotRepository.load>(aggregateId).block() + val snapshot = snapshotLoadRepository.load>(aggregateId).block() blackhole.consume(snapshot) } } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandHandlingBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandHandlingBenchmark.kt index d045389409e..0f92da3bd57 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandHandlingBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandHandlingBenchmark.kt @@ -13,6 +13,9 @@ package me.ahoo.wow.hotpath +import me.ahoo.wow.command.SimpleServerCommandExchange +import me.ahoo.wow.eventsourcing.NoopEventStore +import me.ahoo.wow.modeling.command.SimpleCommandAggregateFactory import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope @@ -21,6 +24,8 @@ import org.openjdk.jmh.infra.Blackhole @State(Scope.Benchmark) open class CommandHandlingBenchmark { + private val commandAggregateFactory = SimpleCommandAggregateFactory(NoopEventStore) + @Benchmark fun createAggregateAndHandle(blackhole: Blackhole) { val aggregate = ConstructorStateAggregateFactory.create( @@ -39,4 +44,35 @@ open class CommandHandlingBenchmark { ) blackhole.consume(aggregate) } + + @Benchmark + fun createCommandAggregate(blackhole: Blackhole) { + val commandMessage = HotPathFixture.createCommandMessage() + val stateAggregate = ConstructorStateAggregateFactory.create( + HotPathFixture.aggregateMetadata.state, + commandMessage.aggregateId, + ) + val commandAggregate = commandAggregateFactory.create( + HotPathFixture.aggregateMetadata, + stateAggregate, + ) + blackhole.consume(commandAggregate) + } + + @Benchmark + fun processCommandAggregate(blackhole: Blackhole) { + val commandMessage = HotPathFixture.createCommandMessage() + val stateAggregate = ConstructorStateAggregateFactory.create( + HotPathFixture.aggregateMetadata.state, + commandMessage.aggregateId, + ) + val commandAggregate = commandAggregateFactory.create( + HotPathFixture.aggregateMetadata, + stateAggregate, + ) + val eventStream = commandAggregate.process( + SimpleServerCommandExchange(commandMessage), + ).block() + blackhole.consume(eventStream) + } } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandProcessingPipelineBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandProcessingPipelineBenchmark.kt index c611f3ae44c..26a5c97c0ad 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandProcessingPipelineBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/CommandProcessingPipelineBenchmark.kt @@ -13,20 +13,26 @@ package me.ahoo.wow.hotpath +import me.ahoo.wow.BenchmarkAggregateSchedulerSupplier +import me.ahoo.wow.api.modeling.AggregateId +import me.ahoo.wow.api.modeling.NamedTypedAggregate import me.ahoo.wow.command.CommandGateway import me.ahoo.wow.command.DefaultCommandGateway import me.ahoo.wow.command.InMemoryCommandBus import me.ahoo.wow.command.ServerCommandExchange +import me.ahoo.wow.command.SimpleServerCommandExchange import me.ahoo.wow.command.createBloomFilterIdempotencyChecker import me.ahoo.wow.command.validation.NoOpValidator import me.ahoo.wow.command.wait.LocalCommandWaitNotifier import me.ahoo.wow.command.wait.ProcessedNotifierFilter import me.ahoo.wow.command.wait.SimpleCommandWaitEndpoint import me.ahoo.wow.command.wait.SimpleWaitStrategyRegistrar +import me.ahoo.wow.command.wait.stage.WaitingForStage import me.ahoo.wow.event.DomainEventBus +import me.ahoo.wow.event.DomainEventStream import me.ahoo.wow.event.InMemoryDomainEventBus import me.ahoo.wow.eventsourcing.EventSourcingStateAggregateRepository -import me.ahoo.wow.eventsourcing.InMemoryEventStore +import me.ahoo.wow.eventsourcing.NoopEventStore import me.ahoo.wow.eventsourcing.snapshot.InMemorySnapshotRepository import me.ahoo.wow.eventsourcing.state.InMemoryStateEventBus import me.ahoo.wow.eventsourcing.state.SendStateEventFilter @@ -36,11 +42,17 @@ import me.ahoo.wow.infra.idempotency.DefaultAggregateIdempotencyCheckerProvider import me.ahoo.wow.ioc.SimpleServiceProvider import me.ahoo.wow.modeling.command.RetryableAggregateProcessorFactory import me.ahoo.wow.modeling.command.SimpleCommandAggregateFactory +import me.ahoo.wow.modeling.command.AggregateProcessor +import me.ahoo.wow.modeling.command.AggregateProcessorFactory +import me.ahoo.wow.modeling.command.CommandAggregateFactory import me.ahoo.wow.modeling.command.dispatcher.AggregateProcessorFilter import me.ahoo.wow.modeling.command.dispatcher.CommandDispatcher +import me.ahoo.wow.modeling.command.dispatcher.CommandHandler import me.ahoo.wow.modeling.command.dispatcher.DefaultCommandHandler import me.ahoo.wow.modeling.command.dispatcher.SendDomainEventStreamFilter +import me.ahoo.wow.modeling.metadata.AggregateMetadata import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory +import me.ahoo.wow.modeling.state.StateAggregateFactory import me.ahoo.wow.modeling.state.StateAggregateRepository import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope @@ -48,11 +60,17 @@ import org.openjdk.jmh.annotations.Setup import org.openjdk.jmh.annotations.State import org.openjdk.jmh.annotations.TearDown import org.openjdk.jmh.infra.Blackhole +import reactor.core.publisher.Mono @State(Scope.Benchmark) open class CommandProcessingPipelineBenchmark { private lateinit var commandGateway: CommandGateway private lateinit var commandDispatcher: CommandDispatcher + private lateinit var aggregateOnlyHandler: CommandHandler + private lateinit var aggregateOnlyWithoutRetryHandler: CommandHandler + private lateinit var aggregateAndDomainEventHandler: CommandHandler + private lateinit var aggregateDomainAndStateEventHandler: CommandHandler + private lateinit var aggregateDomainStateAndProcessedNotifierHandler: CommandHandler @Setup fun setup() { @@ -60,7 +78,7 @@ open class CommandProcessingPipelineBenchmark { val commandBus = InMemoryCommandBus() val domainEventBus: DomainEventBus = InMemoryDomainEventBus() val stateEventBus = InMemoryStateEventBus() - val eventStore = InMemoryEventStore() + val eventStore = NoopEventStore val snapshotRepository = InMemorySnapshotRepository() commandGateway = DefaultCommandGateway( @@ -85,6 +103,42 @@ open class CommandProcessingPipelineBenchmark { stateAggregateRepository, SimpleCommandAggregateFactory(eventStore), ) + val directAggregateProcessorFactory = DirectAggregateProcessorFactory( + ConstructorStateAggregateFactory, + stateAggregateRepository, + SimpleCommandAggregateFactory(eventStore), + ) + aggregateOnlyHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), aggregateProcessorFactory)) + .build() + ) + aggregateOnlyWithoutRetryHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), directAggregateProcessorFactory)) + .build() + ) + aggregateAndDomainEventHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), aggregateProcessorFactory)) + .addFilter(SendDomainEventStreamFilter(domainEventBus)) + .build() + ) + aggregateDomainAndStateEventHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), aggregateProcessorFactory)) + .addFilter(SendDomainEventStreamFilter(domainEventBus)) + .addFilter(SendStateEventFilter(stateEventBus)) + .build() + ) + aggregateDomainStateAndProcessedNotifierHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), aggregateProcessorFactory)) + .addFilter(SendDomainEventStreamFilter(domainEventBus)) + .addFilter(SendStateEventFilter(stateEventBus)) + .addFilter(ProcessedNotifierFilter(commandWaitNotifier)) + .build() + ) val chain = FilterChainBuilder>() .addFilter(AggregateProcessorFilter(SimpleServiceProvider(), aggregateProcessorFactory)) @@ -96,13 +150,21 @@ open class CommandProcessingPipelineBenchmark { namedAggregates = setOf(HotPathFixture.namedAggregate), commandBus = commandGateway, commandHandler = DefaultCommandHandler(chain), + schedulerSupplier = BenchmarkAggregateSchedulerSupplier(), ) commandDispatcher.start() } + private fun createServerExchange(): ServerCommandExchange<*> { + val exchange = SimpleServerCommandExchange(HotPathFixture.createCommandMessage()) + exchange.setAggregateMetadata(HotPathFixture.aggregateMetadata) + return exchange + } + @TearDown fun tearDown() { commandDispatcher.stop() + commandGateway.close() } @Benchmark @@ -117,6 +179,78 @@ open class CommandProcessingPipelineBenchmark { } } + @Benchmark + fun handleAggregateOnly(blackhole: Blackhole) { + try { + val result = aggregateOnlyHandler.handle(createServerExchange()).block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + + @Benchmark + fun handleAggregateOnlyWithoutRetry(blackhole: Blackhole) { + try { + val result = aggregateOnlyWithoutRetryHandler.handle(createServerExchange()).block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + + @Benchmark + fun handleAggregateAndDomainEvent(blackhole: Blackhole) { + try { + val result = aggregateAndDomainEventHandler.handle(createServerExchange()).block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + + @Benchmark + fun handleAggregateDomainAndStateEvent(blackhole: Blackhole) { + try { + val result = aggregateDomainAndStateEventHandler.handle(createServerExchange()).block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + + @Benchmark + fun handleAggregateDomainStateAndProcessedNotifierWithoutWait(blackhole: Blackhole) { + try { + val result = aggregateDomainStateAndProcessedNotifierHandler.handle(createServerExchange()).block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + + @Benchmark + fun handleAggregateDomainStateAndProcessedNotifierWithLocalWait(blackhole: Blackhole) { + try { + val commandMessage = HotPathFixture.createCommandMessage() + val waitStrategy = WaitingForStage.processed(commandMessage.commandId) + waitStrategy.propagate("", commandMessage.header) + SimpleWaitStrategyRegistrar.register(waitStrategy) + waitStrategy.onFinally { + SimpleWaitStrategyRegistrar.unregister(waitStrategy.waitCommandId) + } + val exchange = SimpleServerCommandExchange(commandMessage) + exchange.setAggregateMetadata(HotPathFixture.aggregateMetadata) + val result = aggregateDomainStateAndProcessedNotifierHandler + .handle(exchange) + .then(waitStrategy.waitingLast()) + .block() + blackhole.consume(result) + } catch (e: WowException) { + blackhole.consume(e) + } + } + @Benchmark fun sendFireAndForget(blackhole: Blackhole) { try { @@ -129,3 +263,47 @@ open class CommandProcessingPipelineBenchmark { } } } + +private class DirectAggregateProcessorFactory( + private val stateAggregateFactory: StateAggregateFactory, + private val stateAggregateRepository: StateAggregateRepository, + private val commandAggregateFactory: CommandAggregateFactory +) : AggregateProcessorFactory { + override fun create( + aggregateId: AggregateId, + aggregateMetadata: AggregateMetadata + ): AggregateProcessor = + DirectAggregateProcessor( + aggregateId = aggregateId, + aggregateMetadata = aggregateMetadata, + stateAggregateFactory = stateAggregateFactory, + stateAggregateRepository = stateAggregateRepository, + commandAggregateFactory = commandAggregateFactory, + ) +} + +private class DirectAggregateProcessor( + override val aggregateId: AggregateId, + private val aggregateMetadata: AggregateMetadata, + private val stateAggregateFactory: StateAggregateFactory, + private val stateAggregateRepository: StateAggregateRepository, + private val commandAggregateFactory: CommandAggregateFactory +) : AggregateProcessor, NamedTypedAggregate by aggregateMetadata.command { + override val processorName: String = DirectAggregateProcessor::class.simpleName!! + + override fun process(exchange: ServerCommandExchange<*>): Mono { + val stateAggregateMono = if (exchange.message.isCreate) { + stateAggregateFactory.createAsMono(aggregateMetadata.state, exchange.message.aggregateId) + } else { + stateAggregateRepository.load(aggregateId, aggregateMetadata.state) + } + return stateAggregateMono + .map { + commandAggregateFactory.create(aggregateMetadata, it) + } + .flatMap { + exchange.clearError() + it.process(exchange) + } + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/SnapshotSaveBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/SnapshotSaveBenchmark.kt index 2df7c7a36cd..c9934ec5357 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/SnapshotSaveBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/hotpath/SnapshotSaveBenchmark.kt @@ -15,6 +15,7 @@ package me.ahoo.wow.hotpath import me.ahoo.wow.eventsourcing.snapshot.InMemorySnapshotRepository import me.ahoo.wow.eventsourcing.snapshot.SimpleSnapshot +import me.ahoo.wow.eventsourcing.state.StateEvent.Companion.toStateEvent import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope @@ -34,7 +35,7 @@ open class SnapshotSaveBenchmark { HotPathFixture.aggregateMetadata.state, HotPathFixture.aggregateId, ) - snapshot = SimpleSnapshot(aggregate) + snapshot = SimpleSnapshot(HotPathFixture.createEventStream().toStateEvent(aggregate)) } @Benchmark diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/DeepCopyBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/DeepCopyBenchmark.kt index 8c1c409ce89..099d5728936 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/DeepCopyBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/DeepCopyBenchmark.kt @@ -15,14 +15,12 @@ package me.ahoo.wow.infra import me.ahoo.wow.event.DomainEventStream import me.ahoo.wow.eventsourcing.createEventStream -import me.ahoo.wow.serialization.deepCopy -import me.ahoo.wow.serialization.toJsonNode import me.ahoo.wow.serialization.toLinkedHashMap +import me.ahoo.wow.serialization.toJsonString import me.ahoo.wow.serialization.toObject import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.State -import tools.jackson.databind.node.ObjectNode @State(Scope.Benchmark) open class DeepCopyBenchmark { @@ -30,13 +28,13 @@ open class DeepCopyBenchmark { private val eventStream: DomainEventStream = createEventStream() @Benchmark - fun toJsonNodeToObject(): DomainEventStream { - return eventStream.toJsonNode().toObject() + fun jsonRoundTrip(): DomainEventStream { + return eventStream.toJsonString().toObject() } @Benchmark - fun convertValue(): DomainEventStream { - return eventStream.deepCopy(DomainEventStream::class.java) + fun domainCopy(): DomainEventStream { + return eventStream.copy() } @Benchmark @@ -44,4 +42,4 @@ open class DeepCopyBenchmark { return eventStream.toLinkedHashMap() } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/SinkBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/SinkBenchmark.kt index 09f3119770e..66dd30da369 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/SinkBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/infra/SinkBenchmark.kt @@ -19,6 +19,8 @@ import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.Setup import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import reactor.core.Disposable import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler @@ -29,13 +31,25 @@ open class SinkBenchmark { private lateinit var sink: Sinks.Many private lateinit var concurrentManySink: ConcurrentManySink private lateinit var emitScheduler: Scheduler + private lateinit var sinkSubscription: Disposable + private lateinit var concurrentManySinkSubscription: Disposable @Setup fun setup() { sink = Sinks.many().unicast().onBackpressureBuffer() - sink.asFlux().subscribe() + sinkSubscription = sink.asFlux().subscribe() emitScheduler = Schedulers.newSingle("emit-scheduler") concurrentManySink = Sinks.unsafe().many().unicast().onBackpressureBuffer().concurrent() + concurrentManySinkSubscription = concurrentManySink.asFlux().subscribe() + } + + @TearDown + fun tearDown() { + emitScheduler.dispose() + sink.tryEmitComplete() + concurrentManySink.tryEmitComplete() + sinkSubscription.dispose() + concurrentManySinkSubscription.dispose() } @Benchmark @@ -69,4 +83,4 @@ open class SinkBenchmark { }.subscribeOn(emitScheduler).block() } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/AbstractCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/AbstractCommandDispatcherBenchmark.kt index 907f09b2c6a..aa3074f0250 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/AbstractCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/AbstractCommandDispatcherBenchmark.kt @@ -13,6 +13,8 @@ package me.ahoo.wow.modeling +import me.ahoo.wow.api.command.CommandMessage +import me.ahoo.wow.BenchmarkAggregateSchedulerSupplier import me.ahoo.wow.command.CommandBus import me.ahoo.wow.command.CommandGateway import me.ahoo.wow.command.DefaultCommandGateway @@ -20,6 +22,7 @@ import me.ahoo.wow.command.InMemoryCommandBus import me.ahoo.wow.command.ServerCommandExchange import me.ahoo.wow.command.createBloomFilterIdempotencyChecker import me.ahoo.wow.command.createCommandMessage +import me.ahoo.wow.example.api.cart.AddCartItem import me.ahoo.wow.command.wait.CommandWaitNotifier import me.ahoo.wow.command.wait.LocalCommandWaitNotifier import me.ahoo.wow.command.wait.ProcessedNotifierFilter @@ -49,6 +52,7 @@ import me.ahoo.wow.modeling.command.dispatcher.DefaultCommandHandler import me.ahoo.wow.modeling.command.dispatcher.SendDomainEventStreamFilter import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory import me.ahoo.wow.modeling.state.StateAggregateRepository +import me.ahoo.wow.scheduler.AggregateSchedulerSupplier import me.ahoo.wow.test.validation.TestValidator import org.openjdk.jmh.infra.Blackhole @@ -94,7 +98,8 @@ abstract class AbstractCommandDispatcherBenchmark { .build() commandDispatcher = CommandDispatcher( commandBus = commandGateway, - commandHandler = DefaultCommandHandler(chain) + commandHandler = DefaultCommandHandler(chain), + schedulerSupplier = createSchedulerSupplier() ) commandDispatcher.start() } @@ -125,8 +130,17 @@ abstract class AbstractCommandDispatcherBenchmark { return InMemorySnapshotRepository() } + open fun createSchedulerSupplier(): AggregateSchedulerSupplier { + return BenchmarkAggregateSchedulerSupplier() + } + + open fun createBenchmarkCommandMessage(): CommandMessage { + return createCommandMessage() + } + open fun destroy() { commandDispatcher.stop() + commandGateway.close() } inline fun run(blackHole: Blackhole, block: () -> Any?) { @@ -138,22 +152,24 @@ abstract class AbstractCommandDispatcherBenchmark { } } + // Sent-only helpers are intentionally not JMH benchmarks: tight loops can + // outpace dispatcher processing and measure backlog pressure instead. open fun send(blackHole: Blackhole) { run(blackHole) { - commandGateway.send(createCommandMessage()).block() + commandGateway.send(createBenchmarkCommandMessage()).block() } } open fun sendAndWaitForSent(blackHole: Blackhole) { run(blackHole) { - commandGateway.sendAndWaitForSent(createCommandMessage()).block() + commandGateway.sendAndWaitForSent(createBenchmarkCommandMessage()).block() } } open fun sendAndWaitForProcessed(blackHole: Blackhole) { run(blackHole) { - commandGateway.sendAndWaitForProcessed(createCommandMessage()).block() + commandGateway.sendAndWaitForProcessed(createBenchmarkCommandMessage()).block() } } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherBenchmark.kt index 8da35afd53f..ba029b1c7f6 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherBenchmark.kt @@ -13,7 +13,11 @@ package me.ahoo.wow.modeling +import me.ahoo.wow.api.command.CommandMessage +import me.ahoo.wow.command.createCommandMessageForNewAggregate +import me.ahoo.wow.example.api.cart.AddCartItem import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Level import org.openjdk.jmh.annotations.Scope import org.openjdk.jmh.annotations.Setup import org.openjdk.jmh.annotations.State @@ -23,30 +27,22 @@ import org.openjdk.jmh.infra.Blackhole @State(Scope.Benchmark) open class InMemoryCommandDispatcherBenchmark : AbstractCommandDispatcherBenchmark() { - @Setup + @Setup(Level.Iteration) override fun setup() { super.setup() } - @TearDown + @TearDown(Level.Iteration) override fun destroy() { super.destroy() } - @Benchmark - override fun send(blackHole: Blackhole) { - super.send(blackHole) - } - - @Benchmark - override fun sendAndWaitForSent(blackHole: Blackhole) { - super.sendAndWaitForSent(blackHole) - + override fun createBenchmarkCommandMessage(): CommandMessage { + return createCommandMessageForNewAggregate() } @Benchmark - override fun sendAndWaitForProcessed(blackHole: Blackhole) { + fun sendAndWaitForProcessedForNewAggregate(blackHole: Blackhole) { super.sendAndWaitForProcessed(blackHole) - } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherGrowthBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherGrowthBenchmark.kt new file mode 100644 index 00000000000..fcd6fe1dac5 --- /dev/null +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/InMemoryCommandDispatcherGrowthBenchmark.kt @@ -0,0 +1,41 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.wow.modeling + +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Level +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import org.openjdk.jmh.infra.Blackhole + +@State(Scope.Benchmark) +open class InMemoryCommandDispatcherGrowthBenchmark : AbstractCommandDispatcherBenchmark() { + + @Setup(Level.Iteration) + override fun setup() { + super.setup() + } + + @TearDown(Level.Iteration) + override fun destroy() { + super.destroy() + } + + @Benchmark + fun sendAndWaitForProcessedWithGrowingStream(blackHole: Blackhole) { + super.sendAndWaitForProcessed(blackHole) + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopCommandDispatcherBenchmark.kt index 5127f8ed6d1..7fab1fac5f2 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopCommandDispatcherBenchmark.kt @@ -48,20 +48,8 @@ open class NoopCommandDispatcherBenchmark : AbstractCommandDispatcherBenchmark() super.destroy() } - @Benchmark - override fun send(blackHole: Blackhole) { - super.send(blackHole) - } - - @Benchmark - override fun sendAndWaitForSent(blackHole: Blackhole) { - super.sendAndWaitForSent(blackHole) - - } - @Benchmark override fun sendAndWaitForProcessed(blackHole: Blackhole) { super.sendAndWaitForProcessed(blackHole) - } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopEventStoreCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopEventStoreCommandDispatcherBenchmark.kt index f9b418bd10c..ec67d725b56 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopEventStoreCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/modeling/NoopEventStoreCommandDispatcherBenchmark.kt @@ -39,18 +39,8 @@ open class NoopEventStoreCommandDispatcherBenchmark : AbstractCommandDispatcherB super.destroy() } - @Benchmark - override fun send(blackHole: Blackhole) { - super.send(blackHole) - } - - @Benchmark - override fun sendAndWaitForSent(blackHole: Blackhole) { - super.sendAndWaitForSent(blackHole) - } - @Benchmark override fun sendAndWaitForProcessed(blackHole: Blackhole) { super.sendAndWaitForProcessed(blackHole) } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/mongo/MongoCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/mongo/MongoCommandDispatcherBenchmark.kt index 4794668549f..7db5a828652 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/mongo/MongoCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/mongo/MongoCommandDispatcherBenchmark.kt @@ -13,7 +13,10 @@ package me.ahoo.wow.mongo +import me.ahoo.wow.api.command.CommandMessage +import me.ahoo.wow.command.createCommandMessageForNewAggregate import me.ahoo.wow.eventsourcing.EventStore +import me.ahoo.wow.example.api.cart.AddCartItem import me.ahoo.wow.modeling.AbstractCommandDispatcherBenchmark import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Scope @@ -42,18 +45,12 @@ open class MongoCommandDispatcherBenchmark : AbstractCommandDispatcherBenchmark( return MongoEventStore(mongoInitializer.database) } - @Benchmark - override fun send(blackHole: Blackhole) { - super.send(blackHole) - } - - @Benchmark - override fun sendAndWaitForSent(blackHole: Blackhole) { - super.sendAndWaitForSent(blackHole) + override fun createBenchmarkCommandMessage(): CommandMessage { + return createCommandMessageForNewAggregate() } @Benchmark - override fun sendAndWaitForProcessed(blackHole: Blackhole) { + fun sendAndWaitForProcessedForNewAggregate(blackHole: Blackhole) { super.sendAndWaitForProcessed(blackHole) } -} \ No newline at end of file +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt index 4c751ae13de..69105905ad3 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt @@ -14,11 +14,17 @@ package me.ahoo.wow.redis import org.springframework.data.redis.connection.RedisStandaloneConfiguration +import org.springframework.data.redis.connection.ReactiveRedisConnection import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory import org.springframework.data.redis.core.ReactiveStringRedisTemplate +import java.time.Duration class RedisBenchmarkFixture : AutoCloseable { + companion object { + private val FLUSH_TIMEOUT: Duration = Duration.ofSeconds(30) + } + val connectionFactory: LettuceConnectionFactory val redisTemplate: ReactiveStringRedisTemplate @@ -30,9 +36,22 @@ class RedisBenchmarkFixture : AutoCloseable { connectionFactory = LettuceConnectionFactory(redisConfig, lettuceClientConfiguration) connectionFactory.afterPropertiesSet() redisTemplate = ReactiveStringRedisTemplate(connectionFactory) + flushDb() } override fun close() { + runCatching { + flushDb() + } connectionFactory.destroy() } + + private fun flushDb() { + val connection: ReactiveRedisConnection = connectionFactory.reactiveConnection + try { + connection.serverCommands().flushDb().block(FLUSH_TIMEOUT) + } finally { + connection.close() + } + } } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandDispatcherBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandDispatcherBenchmark.kt index 477c7f55813..b552b62543e 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandDispatcherBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandDispatcherBenchmark.kt @@ -13,7 +13,10 @@ package me.ahoo.wow.redis +import me.ahoo.wow.api.command.CommandMessage +import me.ahoo.wow.command.createCommandMessageForNewAggregate import me.ahoo.wow.eventsourcing.EventStore +import me.ahoo.wow.example.api.cart.AddCartItem import me.ahoo.wow.modeling.AbstractCommandDispatcherBenchmark import me.ahoo.wow.redis.eventsourcing.RedisEventStore import org.openjdk.jmh.annotations.Benchmark @@ -43,18 +46,12 @@ open class RedisCommandDispatcherBenchmark : AbstractCommandDispatcherBenchmark( return RedisEventStore(redis.redisTemplate) } - @Benchmark - override fun send(blackHole: Blackhole) { - super.send(blackHole) - } - - @Benchmark - override fun sendAndWaitForSent(blackHole: Blackhole) { - super.sendAndWaitForSent(blackHole) + override fun createBenchmarkCommandMessage(): CommandMessage { + return createCommandMessageForNewAggregate() } @Benchmark - override fun sendAndWaitForProcessed(blackHole: Blackhole) { + fun sendAndWaitForProcessedForNewAggregate(blackHole: Blackhole) { super.sendAndWaitForProcessed(blackHole) } } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandProcessingPipelineBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandProcessingPipelineBenchmark.kt new file mode 100644 index 00000000000..6946d54d896 --- /dev/null +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisCommandProcessingPipelineBenchmark.kt @@ -0,0 +1,165 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.wow.redis + +import me.ahoo.wow.command.ServerCommandExchange +import me.ahoo.wow.command.SimpleServerCommandExchange +import me.ahoo.wow.command.cartAggregateMetadata +import me.ahoo.wow.command.createCommandMessageForNewAggregate +import me.ahoo.wow.command.wait.LocalCommandWaitNotifier +import me.ahoo.wow.command.wait.ProcessedNotifierFilter +import me.ahoo.wow.command.wait.SimpleWaitStrategyRegistrar +import me.ahoo.wow.command.wait.stage.WaitingForStage +import me.ahoo.wow.event.InMemoryDomainEventBus +import me.ahoo.wow.eventsourcing.EventSourcingStateAggregateRepository +import me.ahoo.wow.eventsourcing.EventStore +import me.ahoo.wow.eventsourcing.snapshot.InMemorySnapshotRepository +import me.ahoo.wow.eventsourcing.state.InMemoryStateEventBus +import me.ahoo.wow.eventsourcing.state.SendStateEventFilter +import me.ahoo.wow.exception.WowException +import me.ahoo.wow.filter.FilterChainBuilder +import me.ahoo.wow.ioc.SimpleServiceProvider +import me.ahoo.wow.modeling.command.RetryableAggregateProcessorFactory +import me.ahoo.wow.modeling.command.SimpleCommandAggregateFactory +import me.ahoo.wow.modeling.command.dispatcher.AggregateProcessorFilter +import me.ahoo.wow.modeling.command.dispatcher.CommandHandler +import me.ahoo.wow.modeling.command.dispatcher.DefaultCommandHandler +import me.ahoo.wow.modeling.command.dispatcher.SendDomainEventStreamFilter +import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory +import me.ahoo.wow.redis.eventsourcing.RedisEventStore +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import org.openjdk.jmh.annotations.Threads +import org.openjdk.jmh.annotations.Warmup +import org.openjdk.jmh.infra.Blackhole + +@Warmup(iterations = 1) +@Measurement(iterations = 2) +@Fork(value = 2) +@Threads(5) +@State(Scope.Benchmark) +open class RedisCommandProcessingPipelineBenchmark { + private lateinit var redis: RedisBenchmarkFixture + private lateinit var aggregateOnlyHandler: CommandHandler + private lateinit var aggregateDomainAndStateEventHandler: CommandHandler + private lateinit var aggregateDomainStateAndProcessedNotifierHandler: CommandHandler + + @Setup + fun setup() { + redis = RedisBenchmarkFixture() + val eventStore = RedisEventStore(redis.redisTemplate) + val commandWaitNotifier = LocalCommandWaitNotifier(SimpleWaitStrategyRegistrar) + val aggregateProcessorFilter = AggregateProcessorFilter( + serviceProvider = SimpleServiceProvider(), + aggregateProcessorFactory = createAggregateProcessorFactory(eventStore), + ) + aggregateOnlyHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(aggregateProcessorFilter) + .build() + ) + aggregateDomainAndStateEventHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(aggregateProcessorFilter) + .addFilter(SendDomainEventStreamFilter(InMemoryDomainEventBus())) + .addFilter(SendStateEventFilter(InMemoryStateEventBus())) + .build() + ) + aggregateDomainStateAndProcessedNotifierHandler = DefaultCommandHandler( + FilterChainBuilder>() + .addFilter(aggregateProcessorFilter) + .addFilter(SendDomainEventStreamFilter(InMemoryDomainEventBus())) + .addFilter(SendStateEventFilter(InMemoryStateEventBus())) + .addFilter(ProcessedNotifierFilter(commandWaitNotifier)) + .build() + ) + } + + @TearDown + fun tearDown() { + redis.close() + } + + private fun createAggregateProcessorFactory(eventStore: EventStore): RetryableAggregateProcessorFactory { + val stateAggregateRepository = EventSourcingStateAggregateRepository( + ConstructorStateAggregateFactory, + InMemorySnapshotRepository(), + eventStore, + ) + return RetryableAggregateProcessorFactory( + ConstructorStateAggregateFactory, + stateAggregateRepository, + SimpleCommandAggregateFactory(eventStore), + ) + } + + private fun createServerExchange(): ServerCommandExchange<*> { + val exchange = SimpleServerCommandExchange(createCommandMessageForNewAggregate()) + exchange.setAggregateMetadata(cartAggregateMetadata) + return exchange + } + + private fun run(blackHole: Blackhole, block: () -> Any?) { + try { + blackHole.consume(block()) + } catch (wowException: WowException) { + blackHole.consume(wowException) + } + } + + @Benchmark + fun handleAggregateOnly(blackHole: Blackhole) { + run(blackHole) { + aggregateOnlyHandler.handle(createServerExchange()).block() + } + } + + @Benchmark + fun handleAggregateDomainAndStateEvent(blackHole: Blackhole) { + run(blackHole) { + aggregateDomainAndStateEventHandler.handle(createServerExchange()).block() + } + } + + @Benchmark + fun handleAggregateDomainStateAndProcessedNotifierWithoutWait(blackHole: Blackhole) { + run(blackHole) { + aggregateDomainStateAndProcessedNotifierHandler.handle(createServerExchange()).block() + } + } + + @Benchmark + fun handleAggregateDomainStateAndProcessedNotifierWithLocalWait(blackHole: Blackhole) { + run(blackHole) { + val commandMessage = createCommandMessageForNewAggregate() + val waitStrategy = WaitingForStage.processed(commandMessage.commandId) + waitStrategy.propagate("", commandMessage.header) + SimpleWaitStrategyRegistrar.register(waitStrategy) + waitStrategy.onFinally { + SimpleWaitStrategyRegistrar.unregister(waitStrategy.waitCommandId) + } + val exchange = SimpleServerCommandExchange(commandMessage) + exchange.setAggregateMetadata(cartAggregateMetadata) + aggregateDomainStateAndProcessedNotifierHandler + .handle(exchange) + .then(waitStrategy.waitingLast()) + .block() + } + } +} diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisEventStoreReadBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisEventStoreReadBenchmark.kt new file mode 100644 index 00000000000..9aac5d32c23 --- /dev/null +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisEventStoreReadBenchmark.kt @@ -0,0 +1,93 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.wow.redis + +import me.ahoo.wow.api.modeling.AggregateId +import me.ahoo.wow.command.cartAggregateMetadata +import me.ahoo.wow.event.DomainEventStream +import me.ahoo.wow.event.toDomainEventStream +import me.ahoo.wow.example.api.cart.CartItem +import me.ahoo.wow.example.api.cart.CartItemAdded +import me.ahoo.wow.eventsourcing.EventStore +import me.ahoo.wow.modeling.aggregateId +import me.ahoo.wow.redis.eventsourcing.RedisEventStore +import me.ahoo.wow.test.aggregate.GivenInitializationCommand +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Param +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import org.openjdk.jmh.annotations.Threads +import org.openjdk.jmh.annotations.Warmup +import org.openjdk.jmh.infra.Blackhole + +@Warmup(iterations = 1) +@Measurement(iterations = 2) +@Fork(value = 2) +@Threads(5) +@State(Scope.Benchmark) +open class RedisEventStoreReadBenchmark { + @Param("10", "100") + var eventCount: Int = 10 + + private lateinit var redis: RedisBenchmarkFixture + private lateinit var eventStore: EventStore + private lateinit var aggregateId: AggregateId + + @Setup + fun setup() { + redis = RedisBenchmarkFixture() + eventStore = RedisEventStore(redis.redisTemplate) + aggregateId = cartAggregateMetadata.aggregateId() + for (eventStream in createEventStreams()) { + eventStore.append(eventStream).block() + } + } + + @TearDown + fun tearDown() { + redis.close() + } + + private fun createEventStreams(): List { + return (1..eventCount).map { version -> + val event = CartItemAdded(CartItem("product-$version", version)) + listOf(event).toDomainEventStream( + upstream = GivenInitializationCommand(aggregateId), + aggregateVersion = version - 1, + ) + } + } + + @Benchmark + fun loadAll(blackHole: Blackhole) { + val eventStreams = eventStore.load(aggregateId, 1, eventCount).collectList().block() + blackHole.consume(eventStreams) + } + + @Benchmark + fun single(blackHole: Blackhole) { + val eventStream = eventStore.single(aggregateId, eventCount).block() + blackHole.consume(eventStream) + } + + @Benchmark + fun last(blackHole: Blackhole) { + val eventStream = eventStore.last(aggregateId).block() + blackHole.consume(eventStream) + } +} diff --git a/wow-benchmarks/src/jmh/resources/META-INF/services/me.ahoo.wow.id.GlobalIdGeneratorFactory b/wow-benchmarks/src/jmh/resources/META-INF/services/me.ahoo.wow.id.GlobalIdGeneratorFactory new file mode 100644 index 00000000000..a06d5312d48 --- /dev/null +++ b/wow-benchmarks/src/jmh/resources/META-INF/services/me.ahoo.wow.id.GlobalIdGeneratorFactory @@ -0,0 +1 @@ +me.ahoo.wow.BenchmarkGlobalIdGeneratorFactory From b33739d717ab4059790b205c0f97b41fc517a5ea Mon Sep 17 00:00:00 2001 From: Ahoo Wang Date: Sun, 7 Jun 2026 19:59:28 +0800 Subject: [PATCH 2/2] fix(benchmark): address benchmark review feedback --- .../gradle/benchmark-reporting.gradle.kts | 15 +++++++++------ .../kotlin/me/ahoo/wow/eventsourcing/Events.kt | 9 +++++---- .../wow/eventsourcing/SnapshotBenchmark.kt | 6 ++++-- .../me/ahoo/wow/redis/RedisBenchmarkFixture.kt | 18 +++++++++++++++++- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/wow-benchmarks/gradle/benchmark-reporting.gradle.kts b/wow-benchmarks/gradle/benchmark-reporting.gradle.kts index 5489523d22a..5e629f68b01 100644 --- a/wow-benchmarks/gradle/benchmark-reporting.gradle.kts +++ b/wow-benchmarks/gradle/benchmark-reporting.gradle.kts @@ -307,8 +307,8 @@ tasks.register("generateBenchmarkReport") { val benchmark = result["benchmark"] as? String ?: continue @Suppress("UNCHECKED_CAST") val primaryMetric = result["primaryMetric"] as? Map ?: continue - val score = primaryMetric["score"] as? Double ?: continue - val scoreError = primaryMetric["scoreError"] as? Double ?: 0.0 + val score = parseMetricNumber(primaryMetric["score"]) ?: continue + val scoreError = parseMetricNumber(primaryMetric["scoreError"]) ?: 0.0 val unit = primaryMetric["scoreUnit"] as? String ?: "ops/s" var allocRateNorm = "—" @@ -316,7 +316,7 @@ tasks.register("generateBenchmarkReport") { val secondaryMetrics = result["secondaryMetrics"] as? Map> if (secondaryMetrics != null) { val gcAlloc = secondaryMetrics["gc.alloc.rate.norm"] - allocRateNorm = String.format(Locale.US, "%.1f B/op", gcAlloc?.get("score") as? Double ?: 0.0) + allocRateNorm = String.format(Locale.US, "%.1f B/op", parseMetricNumber(gcAlloc?.get("score")) ?: 0.0) } val parts = benchmark.split(".") @@ -384,7 +384,7 @@ tasks.register("benchmarkCompare") { fun parseScores(file: java.io.File): Map { @Suppress("UNCHECKED_CAST") val results = parser.parse(file) as List> - return results.associate { result -> + return results.mapIndexed { rowIndex, result -> val benchmark = result["benchmark"] as String @Suppress("UNCHECKED_CAST") val params = result["params"] as? Map @@ -395,8 +395,11 @@ tasks.register("benchmarkCompare") { } @Suppress("UNCHECKED_CAST") val primaryMetric = result["primaryMetric"] as Map - key to (primaryMetric["score"] as Double) - } + val score = parseMetricNumber(primaryMetric["score"]) ?: throw GradleException( + "Invalid JMH score at index $rowIndex in ${file.absolutePath}: primaryMetric.score must be numeric." + ) + key to score + }.toMap() } val baseline = parseScores(baselineFile) diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt index e59acd0c854..85affe6d90b 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/Events.kt @@ -13,6 +13,7 @@ package me.ahoo.wow.eventsourcing +import me.ahoo.wow.api.modeling.AggregateId import me.ahoo.wow.command.cartAggregateMetadata import me.ahoo.wow.event.DomainEventStream import me.ahoo.wow.event.toDomainEventStream @@ -21,16 +22,16 @@ import me.ahoo.wow.example.api.cart.CartItemAdded import me.ahoo.wow.modeling.aggregateId import me.ahoo.wow.test.aggregate.GivenInitializationCommand -fun createEventStream(): DomainEventStream { +fun createEventStream(aggregateId: AggregateId = cartAggregateMetadata.aggregateId()): DomainEventStream { val event = CartItemAdded(CartItem("productId")) return listOf(event).toDomainEventStream( - upstream = GivenInitializationCommand(cartAggregateMetadata.aggregateId()), + upstream = GivenInitializationCommand(aggregateId), ) } -fun createSingleEventStream(): DomainEventStream { +fun createSingleEventStream(aggregateId: AggregateId = cartAggregateMetadata.aggregateId()): DomainEventStream { val event = CartItemAdded(CartItem("productId")) return event.toDomainEventStream( - upstream = GivenInitializationCommand(cartAggregateMetadata.aggregateId()), + upstream = GivenInitializationCommand(aggregateId), ) } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt index d037e61a301..62bc9420a0e 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/eventsourcing/SnapshotBenchmark.kt @@ -48,7 +48,7 @@ open class SnapshotBenchmark { cartAggregateMetadata.state, aggregateId, ) - val eventStream = createEventStream() + val eventStream = createEventStream(aggregateId) val stateEvent = eventStream.toStateEvent(aggregate) val snapshot = SimpleSnapshot(stateEvent) snapshotLoadRepository.save(snapshot).block() @@ -63,7 +63,9 @@ open class SnapshotBenchmark { @Benchmark fun snapshotLoad(blackhole: Blackhole) { - val snapshot = snapshotLoadRepository.load>(aggregateId).block() + val snapshot = checkNotNull( + snapshotLoadRepository.load>(aggregateId).block() + ) blackhole.consume(snapshot) } } diff --git a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt index 69105905ad3..5d54a3263f5 100644 --- a/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt +++ b/wow-benchmarks/src/jmh/kotlin/me/ahoo/wow/redis/RedisBenchmarkFixture.kt @@ -23,6 +23,20 @@ import java.time.Duration class RedisBenchmarkFixture : AutoCloseable { companion object { private val FLUSH_TIMEOUT: Duration = Duration.ofSeconds(30) + private const val DEFAULT_BENCHMARK_DATABASE = 15 + private const val REDIS_DATABASE_PROPERTY = "wow.benchmark.redis.database" + + private fun benchmarkDatabase(): Int { + val configuredDatabase = System.getProperty(REDIS_DATABASE_PROPERTY) + val database = configuredDatabase?.toIntOrNull() ?: DEFAULT_BENCHMARK_DATABASE + require(configuredDatabase == null || configuredDatabase.toIntOrNull() != null) { + "$REDIS_DATABASE_PROPERTY must be an integer." + } + require(database >= 0) { + "$REDIS_DATABASE_PROPERTY must be greater than or equal to 0." + } + return database + } } val connectionFactory: LettuceConnectionFactory @@ -32,7 +46,9 @@ class RedisBenchmarkFixture : AutoCloseable { val lettuceClientConfiguration = LettuceClientConfiguration .builder() .build() - val redisConfig = RedisStandaloneConfiguration() + val redisConfig = RedisStandaloneConfiguration().apply { + database = benchmarkDatabase() + } connectionFactory = LettuceConnectionFactory(redisConfig, lettuceClientConfiguration) connectionFactory.afterPropertiesSet() redisTemplate = ReactiveStringRedisTemplate(connectionFactory)