Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,19 @@ private JsonNode getPrimaryKeys(JsonNode record) {
return null;
}

return md.get("primary_keys");
// Try primary_keys first (MySQL, Oracle, PostgreSQL)
JsonNode primaryKeys = md.get("primary_keys");
if (primaryKeys != null && !primaryKeys.isNull()) {
return primaryKeys;
}

// Fallback to replication_index for SQL Server
JsonNode replicationIndex = md.get("replication_index");
if (replicationIndex != null && !replicationIndex.isNull()) {
return replicationIndex;
}

return null;
}

private Long getSourceMetadataAsLong(JsonNode record, String fieldName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,23 @@ private String getMetadataChangeType(GenericRecord record) {

private JsonNode getPrimaryKeys(GenericRecord record) {
GenericRecord sourceMetadata = (GenericRecord) record.get("source_metadata");
if (sourceMetadata.getSchema().getField("primary_keys") == null
|| sourceMetadata.get("primary_keys") == null) {
if (sourceMetadata == null) {
return null;
}

JsonNode dataInput = getSourceMetadataJson(record);
return dataInput.get("primary_keys");
// Try primary_keys first (MySQL, Oracle, PostgreSQL)
if (sourceMetadata.getSchema().getField("primary_keys") != null
&& sourceMetadata.get("primary_keys") != null) {
return getSourceMetadataJson(record).get("primary_keys");
}

// Fallback to replication_index for SQL Server
if (sourceMetadata.getSchema().getField("replication_index") != null
&& sourceMetadata.get("replication_index") != null) {
return getSourceMetadataJson(record).get("replication_index");
}

return null;
Comment thread
sandesh-jagdale-tp marked this conversation as resolved.
}

private String getUUID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,55 @@ public class FormatDatastreamJsonToJsonTest {
+ " Relations"
+ " Representative\",\"MIN_SALARY\":4500,\"MAX_SALARY\":10500,\"rowid\":1019670290924988842,\"_metadata_source\":{\"schema\":\"HR\",\"table\":\"JOBS\",\"database\":\"XE\",\"row_id\":\"AAAEARAAEAAAAC9AAS\",\"scn\":1706664,\"is_deleted\":false,\"change_type\":\"INSERT\",\"ssn\":0,\"rs_id\":\"\",\"tx_id\":null,\"log_file\":\"\",\"primary_keys\":[\"JOB_ID\"]}}";

private static final String SQLSERVER_JSON_WITH_REPLICATION_INDEX =
"{\"uuid\":\"test-uuid-ri\","
+ "\"read_timestamp\":\"2021-12-25 05:42:04.408\","
+ "\"source_timestamp\":\"2021-12-25T05:42:04.408\","
+ "\"object\":\"dbo_ORDERS\","
+ "\"read_method\":\"sqlserver-cdc\","
+ "\"stream_name\":\"projects/123/locations/us-central1/streams/s\","
+ "\"source_metadata\":{"
+ "\"schema\":\"dbo\","
+ "\"table\":\"ORDERS\","
+ "\"database\":\"mydb\","
+ "\"is_deleted\":false,"
+ "\"change_type\":\"INSERT\","
+ "\"replication_index\":[\"order_id\"]},"
+ "\"payload\":{\"ORDER_ID\":1}}";

private static final String ORACLE_JSON_WITH_NO_KEYS =
"{\"uuid\":\"test-uuid-none\","
+ "\"read_timestamp\":\"2021-12-25 05:42:04.408\","
+ "\"source_timestamp\":\"2021-12-25T05:42:04.408\","
+ "\"object\":\"HR_EMPLOYEES\","
+ "\"read_method\":\"oracle-backfill\","
+ "\"stream_name\":\"projects/123/locations/us-central1/streams/s\","
+ "\"sort_keys\":[1640410924408,0,\"\",0],"
+ "\"source_metadata\":{"
+ "\"schema\":\"HR\","
+ "\"table\":\"EMPLOYEES\","
+ "\"database\":\"XE\","
+ "\"is_deleted\":false,"
+ "\"change_type\":\"INSERT\"},"
+ "\"payload\":{\"EMP_ID\":1}}";

private static final String JSON_WITH_BOTH_KEYS =
"{\"uuid\":\"test-uuid-both\","
+ "\"read_timestamp\":\"2021-12-25 05:42:04.408\","
+ "\"source_timestamp\":\"2021-12-25T05:42:04.408\","
+ "\"object\":\"dbo_ITEMS\","
+ "\"read_method\":\"sqlserver-cdc\","
+ "\"stream_name\":\"projects/123/locations/us-central1/streams/s\","
+ "\"source_metadata\":{"
+ "\"schema\":\"dbo\","
+ "\"table\":\"ITEMS\","
+ "\"database\":\"mydb\","
+ "\"is_deleted\":false,"
+ "\"change_type\":\"INSERT\","
+ "\"primary_keys\":[\"id\"],"
+ "\"replication_index\":[\"repl_col\"]},"
+ "\"payload\":{\"ID\":1}}";

@Test
public void testProcessElement_validJson() {
Map<String, String> renameColumns = ImmutableMap.of("_metadata_row_id", "rowid");
Expand Down Expand Up @@ -194,6 +243,72 @@ public void testProcessElement_sqlServer() {
pipeline.run();
}

@Test
public void testGetPrimaryKeys_sqlServerReplicationIndexFallback() {
FailsafeElementCoder<String, String> coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

PCollection<String> primaryKeys =
pipeline
.apply("CreateInput", Create.of(SQLSERVER_JSON_WITH_REPLICATION_INDEX))
.apply(
"Format",
ParDo.of(
(FormatDatastreamJsonToJson)
FormatDatastreamJsonToJson.create()
.withStreamName("my-stream")
.withLowercaseSourceColumns(false)))
.setCoder(coder)
.apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn()));

PAssert.that(primaryKeys).containsInAnyOrder("[\"order_id\"]");
pipeline.run();
}

@Test
public void testGetPrimaryKeys_nullWhenNeitherFieldPresent() {
FailsafeElementCoder<String, String> coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

PCollection<String> primaryKeys =
pipeline
.apply("CreateInput", Create.of(ORACLE_JSON_WITH_NO_KEYS))
.apply(
"Format",
ParDo.of(
(FormatDatastreamJsonToJson)
FormatDatastreamJsonToJson.create()
.withStreamName("my-stream")
.withLowercaseSourceColumns(false)))
.setCoder(coder)
.apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn()));

PAssert.that(primaryKeys).containsInAnyOrder("null");
pipeline.run();
}

@Test
public void testGetPrimaryKeys_prefersPrimaryKeysWhenBothPresent() {
FailsafeElementCoder<String, String> coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

PCollection<String> primaryKeys =
pipeline
.apply("CreateInput", Create.of(JSON_WITH_BOTH_KEYS))
.apply(
"Format",
ParDo.of(
(FormatDatastreamJsonToJson)
FormatDatastreamJsonToJson.create()
.withStreamName("my-stream")
.withLowercaseSourceColumns(false)))
.setCoder(coder)
.apply("ExtractKeys", ParDo.of(new ExtractPrimaryKeysFn()));

PAssert.that(primaryKeys).containsInAnyOrder("[\"id\"]");
pipeline.run();
}

// Static nested DoFn class to remove timestamp property
static class RemoveTimestampPropertyFn
extends DoFn<FailsafeElement<String, String>, FailsafeElement<String, String>> {
Expand All @@ -211,4 +326,15 @@ public void processElement(
out.output(FailsafeElement.of(changeEvent.toString(), changeEvent.toString()));
}
}

static class ExtractPrimaryKeysFn extends DoFn<FailsafeElement<String, String>, String> {
@ProcessElement
public void processElement(
@Element FailsafeElement<String, String> element, OutputReceiver<String> out)
throws JsonProcessingException {
JsonNode changeEvent = new ObjectMapper().readTree(element.getPayload());
JsonNode primaryKeys = changeEvent.get("_metadata_primary_keys");
out.output(primaryKeys != null ? primaryKeys.toString() : "null");
}
}
}
Loading