diff --git a/pom.xml b/pom.xml index 2a48cf6465..51d2636924 100644 --- a/pom.xml +++ b/pom.xml @@ -51,8 +51,8 @@ 0.8.13 - 2.73.0 - 2.73.0 + 2.74.0 + 2.74.0 diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java index d92075fb06..df6d61ee88 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java @@ -247,4 +247,15 @@ public interface SpannerChangeStreamsToBigQueryOptions Boolean getDisableDlqRetries(); void setDisableDlqRetries(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java index 11adfdd2bf..ef6cbdf93a 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java @@ -186,4 +186,15 @@ public interface SpannerChangeStreamsToGcsOptions RpcPriority getRpcPriority(); void setRpcPriority(RpcPriority rpcPriority); + + @TemplateParameter.Text( + order = 15, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java index a28ab475ad..210c6ae97c 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java @@ -250,4 +250,15 @@ public interface SpannerChangeStreamsToPubSubOptions extends DataflowPipelineOpt Boolean getUseSpannerEmulatorHost(); void setUseSpannerEmulatorHost(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java index 99e8ae23f9..24982c64d6 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java @@ -24,6 +24,7 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams; import com.google.cloud.teleport.v2.utils.DurationUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -137,6 +138,12 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = Arrays.asList(tvfNameListString.split(":")); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -162,7 +169,8 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Creating " + options.getWindowDuration() + " Window", Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java index 412f75851d..379469216c 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java @@ -24,6 +24,7 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -155,6 +156,12 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = Arrays.asList(tvfNameListString.split(":")); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -187,7 +194,8 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Convert each record to a PubsubMessage", FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder() diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 222aef3d52..4a68f9ccd4 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -324,6 +325,14 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName); } + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + List tvfNameList = Arrays.asList(tvfNameListString.split(":")); + if (!tvfNameList.isEmpty()) { + readChangeStream = readChangeStream.withTvfNameList(tvfNameList); + } + } + PCollection dataChangeRecord = pipeline .apply("Read from Spanner Change Streams", readChangeStream)