diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 92bb7517492..a406ac0c97f 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -17,6 +17,8 @@ * limitations under the License. */ +#include + #include #include #include @@ -27,12 +29,37 @@ #include #include +#include #include #include #include "kafka_config.h" #include "kafka_topic.h" +#define FLB_OTEL_LOGS_SCHEMA_KEY "schema" +#define FLB_OTEL_LOGS_SCHEMA_OTLP "otlp" +#define FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES 10 + +struct otlp_logs_resource_partition { + int64_t resource_id; + uint64_t resource_hash; + int has_key; + char key[17]; + msgpack_sbuffer buffer; +}; + +static const char *default_logs_body_keys[] = {"log", "message"}; + +static void init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *options) +{ + memset(options, 0, sizeof(*options)); + options->logs_require_otel_metadata = FLB_FALSE; + options->logs_body_keys = default_logs_body_keys; + options->logs_body_key_count = sizeof(default_logs_body_keys) / + sizeof(default_logs_body_keys[0]); + options->logs_body_key_attributes = FLB_FALSE; +} + void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { @@ -506,11 +533,17 @@ int produce_message(struct flb_time *tm, msgpack_object *map, return FLB_OK; } -static int produce_raw_payload(const void *payload, size_t payload_size, - struct flb_out_kafka *ctx) +static int produce_raw_payload_with_key_retry_control(const void *payload, + size_t payload_size, + char *key, + size_t key_len, + int use_default_key, + int allow_engine_retry, + struct flb_out_kafka *ctx) { int ret; int queue_full_retries; + int queue_full_retry_limit; char *message_key; size_t message_key_len; struct flb_kafka_topic *topic; @@ -520,8 +553,24 @@ static int produce_raw_payload(const void *payload, size_t payload_size, } queue_full_retries = 0; - message_key = ctx->message_key; - message_key_len = ctx->message_key_len; + queue_full_retry_limit = ctx->queue_full_retries; + if (key != NULL) { + message_key = key; + message_key_len = key_len; + } + else if (use_default_key == FLB_TRUE) { + message_key = ctx->message_key; + message_key_len = ctx->message_key_len; + } + else { + message_key = NULL; + message_key_len = 0; + } + + if (queue_full_retry_limit <= 0 && allow_engine_retry == FLB_FALSE) { + queue_full_retry_limit = FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES; + } + topic = flb_kafka_topic_default(ctx); if (topic == NULL) { @@ -530,10 +579,19 @@ static int produce_raw_payload(const void *payload, size_t payload_size, } retry: - if (ctx->queue_full_retries > 0 && - queue_full_retries >= ctx->queue_full_retries) { + if (queue_full_retry_limit > 0 && + queue_full_retries >= queue_full_retry_limit) { ctx->blocked = FLB_FALSE; - return FLB_RETRY; + if (allow_engine_retry == FLB_TRUE) { + return FLB_RETRY; + } + + flb_plg_error(ctx->ins, + "failed to produce partitioned OTLP payload to topic %s: " + "internal queue is full after %d retries", + rd_kafka_topic_name(topic->tp), + queue_full_retries); + return FLB_ERROR; } ret = rd_kafka_produce(topic->tp, @@ -567,22 +625,564 @@ static int produce_raw_payload(const void *payload, size_t payload_size, return FLB_OK; } +static int produce_raw_payload_with_key(const void *payload, size_t payload_size, + char *key, size_t key_len, + struct flb_out_kafka *ctx) +{ + return produce_raw_payload_with_key_retry_control(payload, + payload_size, + key, + key_len, + FLB_TRUE, + FLB_TRUE, + ctx); +} + +static int produce_raw_payload(const void *payload, size_t payload_size, + struct flb_out_kafka *ctx) +{ + return produce_raw_payload_with_key(payload, payload_size, NULL, 0, ctx); +} + +static msgpack_object *msgpack_map_get_object(msgpack_object_map *map, + const char *key) +{ + size_t index; + size_t key_length; + msgpack_object_kv *entry; + + if (map == NULL || key == NULL) { + return NULL; + } + + key_length = strlen(key); + + for (index = 0; index < map->size; index++) { + entry = &map->ptr[index]; + + if (entry->key.type != MSGPACK_OBJECT_STR) { + continue; + } + + if (entry->key.via.str.size != key_length) { + continue; + } + + if (strncmp(entry->key.via.str.ptr, key, key_length) == 0) { + return &entry->val; + } + } + + return NULL; +} + +static int msgpack_map_entry_is_string(msgpack_object_map *map, + const char *key, + const char *expected) +{ + msgpack_object *value; + + value = msgpack_map_get_object(map, key); + if (value == NULL || value->type != MSGPACK_OBJECT_STR) { + return FLB_FALSE; + } + + if (value->via.str.size != strlen(expected)) { + return FLB_FALSE; + } + + if (strncmp(value->via.str.ptr, expected, value->via.str.size) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int msgpack_map_get_int64(msgpack_object_map *map, + const char *key, + int64_t *output) +{ + msgpack_object *value; + + value = msgpack_map_get_object(map, key); + if (value == NULL) { + return -1; + } + + if (value->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + *output = (int64_t) value->via.u64; + return 0; + } + else if (value->type == MSGPACK_OBJECT_NEGATIVE_INTEGER) { + *output = value->via.i64; + return 0; + } + + return -1; +} + +static uint64_t msgpack_object_hash(msgpack_object *object) +{ + uint64_t hash; + msgpack_sbuffer buffer; + msgpack_packer packer; + + if (object == NULL) { + return cfl_hash_64bits("null", 4); + } + + msgpack_sbuffer_init(&buffer); + msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + + if (msgpack_pack_object(&packer, *object) != 0) { + msgpack_sbuffer_destroy(&buffer); + return 0; + } + + hash = cfl_hash_64bits(buffer.data, buffer.size); + msgpack_sbuffer_destroy(&buffer); + + return hash; +} + +static uint64_t msgpack_object_pair_hash(msgpack_object *left, + msgpack_object *right) +{ + uint64_t hash; + msgpack_sbuffer buffer; + msgpack_packer packer; + + msgpack_sbuffer_init(&buffer); + msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + + if (msgpack_pack_array(&packer, 2) != 0) { + msgpack_sbuffer_destroy(&buffer); + return 0; + } + + if (left == NULL) { + msgpack_pack_nil(&packer); + } + else if (msgpack_pack_object(&packer, *left) != 0) { + msgpack_sbuffer_destroy(&buffer); + return 0; + } + + if (right == NULL) { + msgpack_pack_nil(&packer); + } + else if (msgpack_pack_object(&packer, *right) != 0) { + msgpack_sbuffer_destroy(&buffer); + return 0; + } + + hash = cfl_hash_64bits(buffer.data, buffer.size); + msgpack_sbuffer_destroy(&buffer); + + return hash; +} + +static msgpack_object *resource_schema_url_object(msgpack_object *resource_object, + msgpack_object *resource_body) +{ + msgpack_object *schema_url; + + if (resource_body != NULL && resource_body->type == MSGPACK_OBJECT_MAP) { + schema_url = msgpack_map_get_object(&resource_body->via.map, "schema_url"); + if (schema_url != NULL) { + return schema_url; + } + } + + if (resource_object != NULL && resource_object->type == MSGPACK_OBJECT_MAP) { + schema_url = msgpack_map_get_object(&resource_object->via.map, "schema_url"); + if (schema_url != NULL) { + return schema_url; + } + } + + return NULL; +} + +static uint64_t resource_identity_hash(msgpack_object *resource_object, + msgpack_object *resource_body) +{ + msgpack_object *schema_url; + + schema_url = resource_schema_url_object(resource_object, resource_body); + + return msgpack_object_pair_hash(resource_object, schema_url); +} + +static uint64_t resource_attributes_hash(msgpack_object *resource_object) +{ + msgpack_object *attributes; + + if (resource_object == NULL || resource_object->type != MSGPACK_OBJECT_MAP) { + return msgpack_object_hash(NULL); + } + + attributes = msgpack_map_get_object(&resource_object->via.map, "attributes"); + + return msgpack_object_hash(attributes); +} + +static void destroy_otlp_logs_partitions( + struct otlp_logs_resource_partition *partitions, + size_t count) +{ + size_t index; + + if (partitions == NULL) { + return; + } + + for (index = 0; index < count; index++) { + msgpack_sbuffer_destroy(&partitions[index].buffer); + } + + flb_free(partitions); +} + +static struct otlp_logs_resource_partition *find_otlp_logs_partition( + struct otlp_logs_resource_partition *partitions, + size_t count, + int64_t resource_id, + uint64_t resource_hash, + int has_key) +{ + size_t index; + + for (index = 0; index < count; index++) { + if (partitions[index].resource_id == resource_id && + partitions[index].resource_hash == resource_hash && + partitions[index].has_key == has_key) { + return &partitions[index]; + } + } + + return NULL; +} + +static struct otlp_logs_resource_partition *get_otlp_logs_partition( + struct otlp_logs_resource_partition **partitions, + size_t *count, + int64_t resource_id, + uint64_t resource_hash, + uint64_t key_hash, + int has_key) +{ + struct otlp_logs_resource_partition *partition; + struct otlp_logs_resource_partition *tmp; + + partition = find_otlp_logs_partition(*partitions, + *count, + resource_id, + resource_hash, + has_key); + if (partition != NULL) { + return partition; + } + + tmp = flb_realloc(*partitions, + sizeof(struct otlp_logs_resource_partition) * (*count + 1)); + if (tmp == NULL) { + flb_errno(); + return NULL; + } + + *partitions = tmp; + partition = &(*partitions)[*count]; + memset(partition, 0, sizeof(struct otlp_logs_resource_partition)); + + partition->resource_id = resource_id; + partition->resource_hash = resource_hash; + partition->has_key = has_key; + if (has_key == FLB_TRUE) { + snprintf(partition->key, sizeof(partition->key), "%016" PRIx64, key_hash); + } + + msgpack_sbuffer_init(&partition->buffer); + (*count)++; + + return partition; +} + +static int append_partition_record( + struct otlp_logs_resource_partition *partition, + struct flb_log_event_decoder *decoder) +{ + if (partition == NULL || decoder->record_base == NULL || + decoder->record_length == 0) { + return 0; + } + + return msgpack_sbuffer_write(&partition->buffer, + decoder->record_base, + decoder->record_length); +} + +static int get_otlp_group_resource(msgpack_object *group_metadata, + msgpack_object *group_body, + int64_t *resource_id, + msgpack_object **resource_object) +{ + int ret; + int64_t scope_id; + + if (group_metadata == NULL || + group_metadata->type != MSGPACK_OBJECT_MAP || + msgpack_map_entry_is_string(&group_metadata->via.map, + FLB_OTEL_LOGS_SCHEMA_KEY, + FLB_OTEL_LOGS_SCHEMA_OTLP) != FLB_TRUE || + msgpack_map_get_int64(&group_metadata->via.map, + "resource_id", + resource_id) != 0) { + return -1; + } + + if (group_body != NULL && group_body->type == MSGPACK_OBJECT_MAP) { + *resource_object = msgpack_map_get_object(&group_body->via.map, + "resource"); + } + else { + *resource_object = NULL; + } + + ret = msgpack_map_get_int64(&group_metadata->via.map, "scope_id", &scope_id); + if (ret != 0) { + return -1; + } + (void) scope_id; + + return 0; +} + +static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx, + struct flb_event_chunk *event_chunk, + int format) +{ + int ret; + int result; + int32_t record_type; + int64_t resource_id; + uint64_t resource_hash; + uint64_t key_hash; + flb_sds_t payload; + char *key; + size_t key_len; + size_t index; + size_t partition_count; + msgpack_object *group_body; + msgpack_object *group_metadata; + msgpack_object *resource_object; + struct flb_log_event event; + struct flb_log_event_decoder decoder; + struct otlp_logs_resource_partition *partition; + struct otlp_logs_resource_partition *current_partition; + struct otlp_logs_resource_partition *partitions; + struct flb_opentelemetry_otlp_logs_options options; + size_t produced_count; + + partitions = NULL; + partition_count = 0; + current_partition = NULL; + produced_count = 0; + + ret = flb_log_event_decoder_init(&decoder, + (char *) event_chunk->data, + event_chunk->size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "could not decode OTLP log chunk for partitioning: %d", + ret); + return FLB_ERROR; + } + + flb_log_event_decoder_read_groups(&decoder, FLB_TRUE); + + while ((ret = flb_log_event_decoder_next(&decoder, &event)) == + FLB_EVENT_DECODER_SUCCESS) { + ret = flb_log_event_decoder_get_record_type(&event, &record_type); + if (ret != 0) { + flb_plg_error(ctx->ins, + "could not read OTLP log record type for partitioning"); + ret = FLB_ERROR; + goto cleanup; + } + + if (record_type == FLB_LOG_EVENT_GROUP_START) { + group_metadata = event.group_metadata != NULL ? event.group_metadata : event.metadata; + group_body = event.body; + resource_object = NULL; + + ret = get_otlp_group_resource(group_metadata, + group_body, + &resource_id, + &resource_object); + if (ret == 0) { + resource_hash = resource_identity_hash(resource_object, group_body); + key_hash = resource_attributes_hash(resource_object); + current_partition = get_otlp_logs_partition(&partitions, + &partition_count, + resource_id, + resource_hash, + key_hash, + FLB_TRUE); + } + else { + current_partition = get_otlp_logs_partition(&partitions, + &partition_count, + -1, + 0, + 0, + FLB_FALSE); + } + + if (current_partition == NULL) { + ret = FLB_ERROR; + goto cleanup; + } + + ret = append_partition_record(current_partition, &decoder); + if (ret != 0) { + ret = FLB_ERROR; + goto cleanup; + } + continue; + } + else if (record_type == FLB_LOG_EVENT_GROUP_END) { + if (current_partition != NULL) { + ret = append_partition_record(current_partition, &decoder); + if (ret != 0) { + ret = FLB_ERROR; + goto cleanup; + } + } + current_partition = NULL; + continue; + } + + if (current_partition == NULL) { + current_partition = get_otlp_logs_partition(&partitions, + &partition_count, + -1, + 0, + 0, + FLB_FALSE); + if (current_partition == NULL) { + ret = FLB_ERROR; + goto cleanup; + } + } + + ret = append_partition_record(current_partition, &decoder); + if (ret != 0) { + ret = FLB_ERROR; + goto cleanup; + } + } + + ret = flb_log_event_decoder_get_last_result(&decoder); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "could not decode OTLP log chunk for partitioning: %d", + ret); + ret = FLB_ERROR; + goto cleanup; + } + + init_otlp_logs_options(&options); + + for (index = 0; index < partition_count; index++) { + partition = &partitions[index]; + payload = NULL; + + if (format == FLB_KAFKA_FMT_OTLP_JSON) { + payload = flb_opentelemetry_logs_to_otlp_json(partition->buffer.data, + partition->buffer.size, + &options, + &result); + } + else { + payload = flb_opentelemetry_logs_to_otlp_proto(partition->buffer.data, + partition->buffer.size, + &options, + &result); + } + + if (payload == NULL) { + flb_plg_error(ctx->ins, + "could not convert partitioned OTLP logs: %d", + result); + ret = FLB_ERROR; + goto cleanup; + } + + if (partition->has_key == FLB_TRUE) { + key = partition->key; + key_len = strlen(partition->key); + } + else { + key = NULL; + key_len = 0; + } + + /* + * Partitioned OTLP log sends are at-most-once after the first partition + * is accepted by librdkafka. If a later partition fails, return FLB_ERROR + * instead of FLB_RETRY so engine replay does not duplicate partitions + * already enqueued. Under sustained back-pressure this can partially + * deliver the original chunk; disable this option or reduce Kafka + * back-pressure when chunk-level retry durability is required. + */ + ret = produce_raw_payload_with_key_retry_control(payload, + flb_sds_len(payload), + key, + key_len, + FLB_FALSE, + produced_count == 0, + ctx); + + if (format == FLB_KAFKA_FMT_OTLP_JSON) { + flb_sds_destroy(payload); + } + else { + flb_opentelemetry_logs_proto_destroy(payload); + } + + if (ret != FLB_OK) { + goto cleanup; + } + produced_count++; + } + + ret = FLB_OK; + +cleanup: + flb_log_event_decoder_destroy(&decoder); + destroy_otlp_logs_partitions(partitions, partition_count); + + return ret; +} + static int produce_otlp_json(struct flb_out_kafka *ctx, struct flb_event_chunk *event_chunk) { int result; flb_sds_t payload; struct flb_opentelemetry_otlp_logs_options options; - static const char *default_logs_body_keys[] = {"log", "message"}; payload = NULL; if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { - memset(&options, 0, sizeof(options)); - options.logs_require_otel_metadata = FLB_FALSE; - options.logs_body_keys = default_logs_body_keys; - options.logs_body_key_count = 2; - options.logs_body_key_attributes = FLB_FALSE; + if (ctx->otlp_logs_partition_by_resource == FLB_TRUE) { + return produce_partitioned_otlp_logs(ctx, + event_chunk, + FLB_KAFKA_FMT_OTLP_JSON); + } + + init_otlp_logs_options(&options); payload = flb_opentelemetry_logs_to_otlp_json(event_chunk->data, event_chunk->size, @@ -629,14 +1229,15 @@ static int produce_otlp_proto(struct flb_out_kafka *ctx, struct ctrace *ctr; flb_sds_t payload; struct flb_opentelemetry_otlp_logs_options options; - static const char *default_logs_body_keys[] = {"log", "message"}; if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { - memset(&options, 0, sizeof(options)); - options.logs_require_otel_metadata = FLB_FALSE; - options.logs_body_keys = default_logs_body_keys; - options.logs_body_key_count = 2; - options.logs_body_key_attributes = FLB_FALSE; + if (ctx->otlp_logs_partition_by_resource == FLB_TRUE) { + return produce_partitioned_otlp_logs(ctx, + event_chunk, + FLB_KAFKA_FMT_OTLP_PROTO); + } + + init_otlp_logs_options(&options); payload = flb_opentelemetry_logs_to_otlp_proto(event_chunk->data, event_chunk->size, @@ -823,6 +1424,17 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_out_kafka, format_str), "Set the record output format. Supported values include json, msgpack, gelf, raw, otlp_json and otlp_proto." }, + { + FLB_CONFIG_MAP_BOOL, "otlp_logs_partition_by_resource", "false", + 0, FLB_TRUE, offsetof(struct flb_out_kafka, otlp_logs_partition_by_resource), + "When using format otlp_json or otlp_proto, split OTLP log payloads by " + "resource and use a hash of the resource attributes as the Kafka message key. " + "This supersedes message_key and message_key_field for those chunks; logs " + "without resource information are unkeyed. After partial partition delivery, " + "later produce failures are not retried by the engine to avoid duplicates; " + "disable this option or reduce Kafka back-pressure for chunk-level retry " + "durability." + }, { FLB_CONFIG_MAP_STR, "message_key", (char *)NULL, 0, FLB_TRUE, offsetof(struct flb_out_kafka, message_key), diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 1f33d4c794d..7a9c1e57775 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -87,6 +87,8 @@ struct flb_out_kafka { int raw_log_key_len; char *raw_log_key; + int otlp_logs_partition_by_resource; + /* Gelf Keys */ struct flb_gelf_fields gelf_fields; diff --git a/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yaml b/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yaml new file mode 100644 index 00000000000..ff325961435 --- /dev/null +++ b/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yaml @@ -0,0 +1,22 @@ +service: + flush: 5 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: opentelemetry + port: ${FLUENT_BIT_TEST_LISTENER_PORT} + + outputs: + - name: kafka + match: "*" + brokers: 127.0.0.1:${TEST_SUITE_KAFKA_PORT} + topics: otlp-topic + format: otlp_json + otlp_logs_partition_by_resource: true + message_key: static-otlp-key + queue_full_retries: 1 + rdkafka.api.version.request: false + rdkafka.broker.version.fallback: 0.8.2.0 diff --git a/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yaml b/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yaml new file mode 100644 index 00000000000..ffd48c10874 --- /dev/null +++ b/tests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yaml @@ -0,0 +1,22 @@ +service: + flush: 5 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: opentelemetry + port: ${FLUENT_BIT_TEST_LISTENER_PORT} + + outputs: + - name: kafka + match: "*" + brokers: 127.0.0.1:${TEST_SUITE_KAFKA_PORT} + topics: otlp-topic + format: otlp_proto + otlp_logs_partition_by_resource: true + message_key: static-otlp-key + queue_full_retries: 1 + rdkafka.api.version.request: false + rdkafka.broker.version.fallback: 0.8.2.0 diff --git a/tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py b/tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py index abe2f93a284..84bceef31df 100644 --- a/tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py +++ b/tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py @@ -645,3 +645,57 @@ def test_out_kafka_otlp_logs_preserve_resource_schema_urls_across_requests( assert body_to_schema_url["event-a"] == "schema-a" assert body_to_schema_url["event-b"] == "schema-b" assert len(resources) == 2 + + +@pytest.mark.parametrize( + "format_name,config_file", + [ + ("otlp_json", "out_kafka_otlp_json_partition_by_resource.yaml"), + ("otlp_proto", "out_kafka_otlp_proto_partition_by_resource.yaml"), + ], +) +def test_out_kafka_otlp_logs_partition_by_resource(format_name, config_file): + service = Service(config_file) + service.start() + service.send_payload_dict( + _build_resource_collision_payload("user-a", "event-a"), + "logs", + ) + service.send_payload_dict( + _build_resource_collision_payload("user-b", "event-b"), + "logs", + ) + + messages = service.wait_for_messages(2, timeout=10) + service.stop() + + assert len(messages) == 2 + + keys = {message["key"] for message in messages} + assert len(keys) == 2 + assert b"static-otlp-key" not in keys + + body_to_user = {} + for message in messages: + assert message["topic"] == "otlp-topic" + assert message["key"] + + payload = _decode_kafka_payload(message, format_name, "logs") + resources = payload["resourceLogs"] + assert len(resources) == 1 + + resource = resources[0] + user_id = next( + attribute["value"]["stringValue"] + for attribute in resource["resource"]["attributes"] + if attribute["key"] == "user.id" + ) + + for scope in resource["scopeLogs"]: + for record in scope["logRecords"]: + body_to_user[record["body"]["stringValue"]] = user_id + + assert body_to_user == { + "event-a": "user-a", + "event-b": "user-b", + }