diff --git a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java index 03f4593fed..b0aa627d9c 100644 --- a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java +++ b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJson.java @@ -243,7 +243,19 @@ private JsonNode getPrimaryKeys(JsonNode record) { return null; } - return md.get("primary_keys"); + // Try primary_keys first (MySQL, Oracle, PostgreSQL) + JsonNode primaryKeys = md.get("primary_keys"); + if (primaryKeys != null && !primaryKeys.isNull()) { + return primaryKeys; + } + + // Fallback to replication_index for SQL Server + JsonNode replicationIndex = md.get("replication_index"); + if (replicationIndex != null && !replicationIndex.isNull()) { + return replicationIndex; + } + + return null; } private Long getSourceMetadataAsLong(JsonNode record, String fieldName) { diff --git a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java index f611818707..d9bfd97d44 100644 --- a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java +++ b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java @@ -285,13 +285,23 @@ private String getMetadataChangeType(GenericRecord record) { private JsonNode getPrimaryKeys(GenericRecord record) { GenericRecord sourceMetadata = (GenericRecord) record.get("source_metadata"); - if (sourceMetadata.getSchema().getField("primary_keys") == null - || sourceMetadata.get("primary_keys") == null) { + if (sourceMetadata == null) { return null; } - JsonNode dataInput = getSourceMetadataJson(record); - return dataInput.get("primary_keys"); + // Try primary_keys first (MySQL, Oracle, PostgreSQL) + if (sourceMetadata.getSchema().getField("primary_keys") != null + && sourceMetadata.get("primary_keys") != null) { + return getSourceMetadataJson(record).get("primary_keys"); + } + + // Fallback to replication_index for SQL Server + if (sourceMetadata.getSchema().getField("replication_index") != null + && sourceMetadata.get("replication_index") != null) { + return getSourceMetadataJson(record).get("replication_index"); + } + + return null; } private String getUUID() { diff --git a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java index 07192df408..394434f04a 100644 --- a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java +++ b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamJsonToJsonTest.java @@ -105,6 +105,55 @@ public class FormatDatastreamJsonToJsonTest { + " Relations" + " Representative\",\"MIN_SALARY\":4500,\"MAX_SALARY\":10500,\"rowid\":1019670290924988842,\"_metadata_source\":{\"schema\":\"HR\",\"table\":\"JOBS\",\"database\":\"XE\",\"row_id\":\"AAAEARAAEAAAAC9AAS\",\"scn\":1706664,\"is_deleted\":false,\"change_type\":\"INSERT\",\"ssn\":0,\"rs_id\":\"\",\"tx_id\":null,\"log_file\":\"\",\"primary_keys\":[\"JOB_ID\"]}}"; + private static final String SQLSERVER_JSON_WITH_REPLICATION_INDEX = + "{\"uuid\":\"test-uuid-ri\"," + + "\"read_timestamp\":\"2021-12-25 05:42:04.408\"," + + "\"source_timestamp\":\"2021-12-25T05:42:04.408\"," + + "\"object\":\"dbo_ORDERS\"," + + "\"read_method\":\"sqlserver-cdc\"," + + "\"stream_name\":\"projects/123/locations/us-central1/streams/s\"," + + "\"source_metadata\":{" + + "\"schema\":\"dbo\"," + + "\"table\":\"ORDERS\"," + + "\"database\":\"mydb\"," + + "\"is_deleted\":false," + + "\"change_type\":\"INSERT\"," + + "\"replication_index\":[\"order_id\"]}," + + "\"payload\":{\"ORDER_ID\":1}}"; + + private static final String ORACLE_JSON_WITH_NO_KEYS = + "{\"uuid\":\"test-uuid-none\"," + + "\"read_timestamp\":\"2021-12-25 05:42:04.408\"," + + "\"source_timestamp\":\"2021-12-25T05:42:04.408\"," + + "\"object\":\"HR_EMPLOYEES\"," + + "\"read_method\":\"oracle-backfill\"," + + "\"stream_name\":\"projects/123/locations/us-central1/streams/s\"," + + "\"sort_keys\":[1640410924408,0,\"\",0]," + + "\"source_metadata\":{" + + "\"schema\":\"HR\"," + + "\"table\":\"EMPLOYEES\"," + + "\"database\":\"XE\"," + + "\"is_deleted\":false," + + "\"change_type\":\"INSERT\"}," + + "\"payload\":{\"EMP_ID\":1}}"; + + private static final String JSON_WITH_BOTH_KEYS = + "{\"uuid\":\"test-uuid-both\"," + + "\"read_timestamp\":\"2021-12-25 05:42:04.408\"," + + "\"source_timestamp\":\"2021-12-25T05:42:04.408\"," + + "\"object\":\"dbo_ITEMS\"," + + "\"read_method\":\"sqlserver-cdc\"," + + "\"stream_name\":\"projects/123/locations/us-central1/streams/s\"," + + "\"source_metadata\":{" + + "\"schema\":\"dbo\"," + + "\"table\":\"ITEMS\"," + + "\"database\":\"mydb\"," + + "\"is_deleted\":false," + + "\"change_type\":\"INSERT\"," + + "\"primary_keys\":[\"id\"]," + + "\"replication_index\":[\"repl_col\"]}," + + "\"payload\":{\"ID\":1}}"; + @Test public void testProcessElement_validJson() { Map renameColumns = ImmutableMap.of("_metadata_row_id", "rowid"); @@ -194,6 +243,72 @@ public void testProcessElement_sqlServer() { pipeline.run(); } + @Test + public void testGetPrimaryKeys_sqlServerReplicationIndexFallback() { + FailsafeElementCoder coder = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + PCollection primaryKeys = + pipeline + .apply("CreateInput", Create.of(SQLSERVER_JSON_WITH_REPLICATION_INDEX)) + .apply( + "Format", + ParDo.of( + (FormatDatastreamJsonToJson) + FormatDatastreamJsonToJson.create() + .withStreamName("my-stream") + .withLowercaseSourceColumns(false))) + .setCoder(coder) + .apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn())); + + PAssert.that(primaryKeys).containsInAnyOrder("[\"order_id\"]"); + pipeline.run(); + } + + @Test + public void testGetPrimaryKeys_nullWhenNeitherFieldPresent() { + FailsafeElementCoder coder = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + PCollection primaryKeys = + pipeline + .apply("CreateInput", Create.of(ORACLE_JSON_WITH_NO_KEYS)) + .apply( + "Format", + ParDo.of( + (FormatDatastreamJsonToJson) + FormatDatastreamJsonToJson.create() + .withStreamName("my-stream") + .withLowercaseSourceColumns(false))) + .setCoder(coder) + .apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn())); + + PAssert.that(primaryKeys).containsInAnyOrder("null"); + pipeline.run(); + } + + @Test + public void testGetPrimaryKeys_prefersPrimaryKeysWhenBothPresent() { + FailsafeElementCoder coder = + FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + PCollection primaryKeys = + pipeline + .apply("CreateInput", Create.of(JSON_WITH_BOTH_KEYS)) + .apply( + "Format", + ParDo.of( + (FormatDatastreamJsonToJson) + FormatDatastreamJsonToJson.create() + .withStreamName("my-stream") + .withLowercaseSourceColumns(false))) + .setCoder(coder) + .apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn())); + + PAssert.that(primaryKeys).containsInAnyOrder("[\"id\"]"); + pipeline.run(); + } + // Static nested DoFn class to remove timestamp property static class RemoveTimestampPropertyFn extends DoFn, FailsafeElement> { @@ -211,4 +326,15 @@ public void processElement( out.output(FailsafeElement.of(changeEvent.toString(), changeEvent.toString())); } } + + static class ExtractPrimaryKeysFn extends DoFn, String> { + @ProcessElement + public void processElement( + @Element FailsafeElement element, OutputReceiver out) + throws JsonProcessingException { + JsonNode changeEvent = new ObjectMapper().readTree(element.getPayload()); + JsonNode primaryKeys = changeEvent.get("_metadata_primary_keys"); + out.output(primaryKeys != null ? primaryKeys.toString() : "null"); + } + } } diff --git a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java index 3d4b6fa47c..9b06541856 100644 --- a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java +++ b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; +import java.util.List; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -315,6 +316,177 @@ public void testIntervalNano() throws JsonProcessingException { assertEquals(expected, new ObjectMapper().writeValueAsString(objectNode)); } + @Test + public void testGetPrimaryKeys_primaryKeysField() throws IOException { + Schema arraySchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema sourceMetadataSchema = + SchemaBuilder.record("source_metadata_pk") + .fields() + .name("schema") + .type() + .stringType() + .noDefault() + .name("database") + .type() + .stringType() + .noDefault() + .name("table") + .type() + .stringType() + .noDefault() + .name("change_type") + .type() + .stringType() + .noDefault() + .name("is_deleted") + .type() + .booleanType() + .noDefault() + .name("primary_keys") + .type(arraySchema) + .noDefault() + .endRecord(); + GenericRecord sourceMetadata = new GenericData.Record(sourceMetadataSchema); + sourceMetadata.put("schema", "mydb"); + sourceMetadata.put("database", "mydb"); + sourceMetadata.put("table", "users"); + sourceMetadata.put("change_type", "INSERT"); + sourceMetadata.put("is_deleted", false); + sourceMetadata.put("primary_keys", new GenericData.Array<>(arraySchema, List.of("id"))); + + GenericRecord record = buildOuterRecord(sourceMetadata, "mysql-cdc-binlog"); + String json = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload(); + JsonNode output = new ObjectMapper().readTree(json); + + assertEquals("[\"id\"]", output.get("_metadata_primary_keys").toString()); + } + + @Test + public void testGetPrimaryKeys_sqlServerReplicationIndexFallback() throws IOException { + Schema arraySchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema sourceMetadataSchema = + SchemaBuilder.record("source_metadata_ri") + .fields() + .name("schema") + .type() + .stringType() + .noDefault() + .name("table") + .type() + .stringType() + .noDefault() + .name("change_type") + .type() + .stringType() + .noDefault() + .name("is_deleted") + .type() + .booleanType() + .noDefault() + .name("replication_index") + .type(arraySchema) + .noDefault() + .endRecord(); + GenericRecord sourceMetadata = new GenericData.Record(sourceMetadataSchema); + sourceMetadata.put("schema", "dbo"); + sourceMetadata.put("table", "orders"); + sourceMetadata.put("change_type", "INSERT"); + sourceMetadata.put("is_deleted", false); + sourceMetadata.put( + "replication_index", new GenericData.Array<>(arraySchema, List.of("order_id"))); + + GenericRecord record = buildOuterRecord(sourceMetadata, "sqlserver-cdc-logbased"); + String json = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload(); + JsonNode output = new ObjectMapper().readTree(json); + + assertEquals("[\"order_id\"]", output.get("_metadata_primary_keys").toString()); + } + + @Test + public void testGetPrimaryKeys_nullWhenNeitherFieldPresent() throws IOException { + Schema sourceMetadataSchema = + SchemaBuilder.record("source_metadata_none") + .fields() + .name("schema") + .type() + .stringType() + .noDefault() + .name("table") + .type() + .stringType() + .noDefault() + .name("change_type") + .type() + .stringType() + .noDefault() + .name("is_deleted") + .type() + .booleanType() + .noDefault() + .endRecord(); + GenericRecord sourceMetadata = new GenericData.Record(sourceMetadataSchema); + sourceMetadata.put("schema", "HR"); + sourceMetadata.put("table", "employees"); + sourceMetadata.put("change_type", "INSERT"); + sourceMetadata.put("is_deleted", false); + + GenericRecord record = buildOuterRecord(sourceMetadata, "oracle-dump"); + String json = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload(); + JsonNode output = new ObjectMapper().readTree(json); + + assertTrue(output.get("_metadata_primary_keys").isNull()); + } + + @Test + public void testGetPrimaryKeys_prefersPrimaryKeysWhenBothPresent() throws IOException { + Schema arraySchema = Schema.createArray(Schema.create(Schema.Type.STRING)); + Schema sourceMetadataSchema = + SchemaBuilder.record("source_metadata_both") + .fields() + .name("schema") + .type() + .stringType() + .noDefault() + .name("database") + .type() + .stringType() + .noDefault() + .name("table") + .type() + .stringType() + .noDefault() + .name("change_type") + .type() + .stringType() + .noDefault() + .name("is_deleted") + .type() + .booleanType() + .noDefault() + .name("primary_keys") + .type(arraySchema) + .noDefault() + .name("replication_index") + .type(arraySchema) + .noDefault() + .endRecord(); + GenericRecord sourceMetadata = new GenericData.Record(sourceMetadataSchema); + sourceMetadata.put("schema", "dbo"); + sourceMetadata.put("database", "dbo"); + sourceMetadata.put("table", "items"); + sourceMetadata.put("change_type", "UPDATE"); + sourceMetadata.put("is_deleted", false); + sourceMetadata.put("primary_keys", new GenericData.Array<>(arraySchema, List.of("id"))); + sourceMetadata.put( + "replication_index", new GenericData.Array<>(arraySchema, List.of("repl_col"))); + + GenericRecord record = buildOuterRecord(sourceMetadata, "mysql-cdc-binlog"); + String json = FormatDatastreamRecordToJson.create().apply(record).getOriginalPayload(); + JsonNode output = new ObjectMapper().readTree(json); + + assertEquals("[\"id\"]", output.get("_metadata_primary_keys").toString()); + } + private GenericRecord generateIntervalNanosRecord( Long years, Long months, Long days, Long hours, Long minutes, Long seconds, Long nanos) { @@ -357,4 +529,45 @@ private Schema generateIntervalNanosSchema() { .withDefault(0L) .endRecord(); } + + private GenericRecord buildOuterRecord(GenericRecord sourceMetadata, String readMethod) { + Schema payloadSchema = SchemaBuilder.record("payload").fields().endRecord(); + GenericRecord payload = new GenericData.Record(payloadSchema); + + Schema outerSchema = + SchemaBuilder.record("test_record") + .fields() + .name("stream_name") + .type() + .stringType() + .noDefault() + .name("read_timestamp") + .type() + .longType() + .noDefault() + .name("source_timestamp") + .type() + .longType() + .noDefault() + .name("read_method") + .type() + .stringType() + .noDefault() + .name("source_metadata") + .type(sourceMetadata.getSchema()) + .noDefault() + .name("payload") + .type(payloadSchema) + .noDefault() + .endRecord(); + + GenericRecord record = new GenericData.Record(outerSchema); + record.put("stream_name", "test-stream"); + record.put("read_timestamp", 1000000L); + record.put("source_timestamp", 1000000L); + record.put("read_method", readMethod); + record.put("source_metadata", sourceMetadata); + record.put("payload", payload); + return record; + } }