Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/spanner-failure-injection-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ permissions: write-all
jobs:
spanner_failure_injection_tests:
name: Spanner Failure Injection Tests
timeout-minutes: 180
timeout-minutes: 360
runs-on: [ self-hosted, spanner-it ]
steps:
- name: Checkout Code
Expand Down
1 change: 1 addition & 0 deletions cicd/cmd/run-failure-injection-tests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
mvnFlags := workflows.NewMavenFlags()
err := workflows.MvnCleanInstall().Run(
mvnFlags.IncludeDependencies(),
mvnFlags.RunFailureInjectionTests(),
mvnFlags.SkipDependencyAnalysis(),
mvnFlags.SkipCheckstyle(),
mvnFlags.SkipJib(),
Expand Down
4 changes: 3 additions & 1 deletion python/generate_all_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ set -e
SCRIPTPATH=$(dirname "$0")

sh $SCRIPTPATH/generate_dependencies.sh $SCRIPTPATH/../python/src/main/python/streaming-llm/base_requirements.txt $SCRIPTPATH/../python/src/main/python/streaming-llm/requirements.txt
sh $SCRIPTPATH/generate_dependencies.sh $SCRIPTPATH/default_base_bqmonitor_requirements.txt $SCRIPTPATH/../python/src/main/python/bigquery-anomaly-detection/requirements_all.txt
# Generate a base set of dependencies to use for any templates without special dependencies
mkdir -p $SCRIPTPATH/__build__/
sh $SCRIPTPATH/generate_dependencies.sh $SCRIPTPATH/default_base_bqmonitor_requirements.txt $SCRIPTPATH/__build__/bqmonitor_requirements.txt
sh $SCRIPTPATH/generate_dependencies.sh $SCRIPTPATH/default_base_python_requirements.txt $SCRIPTPATH/__build__/default_python_requirements.txt
sh $SCRIPTPATH/generate_dependencies.sh $SCRIPTPATH/default_base_yaml_requirements.txt $SCRIPTPATH/__build__/default_yaml_requirements.txt

cp $SCRIPTPATH/__build__/bqmonitor_requirements.txt $SCRIPTPATH/../python/src/main/python/bigquery-anomaly-detection/requirements_all.txt

cp $SCRIPTPATH/__build__/default_python_requirements.txt $SCRIPTPATH/../python/src/main/python/word-count-python/requirements.txt

cp $SCRIPTPATH/__build__/default_yaml_requirements.txt $SCRIPTPATH/../python/src/main/python/yaml-template/requirements.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@
"[Experimental] Real-time anomaly detection on BigQuery change data (CDC). "
+ "Reads streaming APPENDS/CHANGES data from a BigQuery table, "
+ "computes a configurable windowed metric, runs anomaly detection "
+ "(ZScore, IQR, or RobustZScore), and publishes anomalies to Pub/Sub.",
+ "(ZScore, IQR, or RobustZScore), and emits anomalies to Pub/Sub "
+ "and/or a REST webhook. "
+ "Alerts to Pub/Sub and the REST webhook are rate-limited by "
+ "default: per anomaly key, after the first alert fires further "
+ "anomalies are suppressed until a 10-minute gap between "
+ "consecutive anomalies elapses. Tune or disable via "
+ "alert_cooldown_seconds (set to 0 to disable). The BigQuery "
+ "sink table is unaffected and records every anomaly.",
preview = true,
flexContainerName = "bigquery-anomaly-detection",
filesToCopy = {"main.py", "setup.py", "pyproject.toml", "requirements_all.txt", "src"},
Expand Down Expand Up @@ -71,10 +78,13 @@ public interface BigQueryAnomalyDetection {

@TemplateParameter.Text(
order = 4,
optional = true,
name = "topic",
description = "Pub/Sub Topic",
helpText =
"Pub/Sub topic for anomaly results. " + "Full path: projects/<project>/topics/<topic>.",
"Pub/Sub topic for anomaly results. "
+ "Full path: projects/<project>/topics/<topic>. "
+ "Optional: at least one of topic or webhook_spec must be set.",
regexes = {"^projects/[^/]+/topics/[^/]+$"})
String getTopic();

Expand Down Expand Up @@ -193,4 +203,41 @@ public interface BigQueryAnomalyDetection {
+ "Example: {\"job_id\": \"pipeline-123\", \"env\": \"prod\"}. "
+ "Anomaly fields take precedence on key collision.")
String getMessageMetadata();

@TemplateParameter.Text(
order = 17,
optional = true,
name = "webhook_spec",
description = "REST Webhook Specification (JSON)",
helpText =
"JSON object configuring a REST webhook for anomaly results. "
+ "Required keys: endpoint (http/https URL), body (JSON object/array). "
+ "Optional keys: method (POST/PUT/PATCH, default POST), "
+ "headers (object), scopes (list of OAuth scopes; default "
+ "cloud-platform), timeout_seconds (default 600, i.e. 10 min), "
+ "parallelism (max concurrent in-flight POSTs per worker, "
+ "default 5), callback_frequency_seconds (how often the "
+ "AsyncWrapper sweeps finished futures, default 30). "
+ "String leaves in body and headers are Python-format-substituted "
+ "against anomaly fields, message_metadata keys, and the "
+ "{anomaly_message} field (which equals message_format output, "
+ "or a default natural-language summary). "
+ "At least one of topic or webhook_spec must be set.")
String getWebhookSpec();

@TemplateParameter.Double(
order = 18,
optional = true,
name = "alert_cooldown_seconds",
description = "Alert Cooldown (seconds)",
helpText =
"Session-window gap for debouncing alerts to external systems "
+ "(Pub/Sub, webhook). Per anomaly key, the first anomaly fires "
+ "immediately; subsequent anomalies are suppressed (logged as "
+ "\"still active\") until a gap of at least this many seconds "
+ "passes between consecutive anomalies. Continuous anomalies "
+ "extend the active-alert window. The BigQuery sink table is "
+ "unaffected and records every anomaly. Set to 0 to disable "
+ "rate limiting. Default: 600 (10 minutes).")
Double getAlertCooldownSeconds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ version = "0.1.0"
description = "BigQuery anomaly monitoring pipeline (Dataflow Flex Template)"
requires-python = ">=3.11"
dependencies = [
"apache-beam[gcp]==2.72.0",
"apache-beam[gcp]>=2.73.0",
"google-cloud-bigquery-storage",
]
Loading
Loading