Skip to content
Closed
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [Unreleased]

### Features

- **Multiple InfluxDB Database Support** — Route different measurements to different InfluxDB databases from a single Dagger job. New `getInfluxSink(databaseName, measurementName)` API on `SinkOrchestrator`. Configure multiple named databases via `SINK_INFLUX_DATABASES_CONFIG` JSON array. Fully backward compatible with existing single-database configuration.

### [v0.9.0](https://github.com/goto/dagger/releases/tag/v0.9.0) (2023-03-16)

### Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) {
private void addSink(StreamInfo streamInfo) {
SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter);
sinkOrchestrator.addSubscriber(telemetryExporter);
sinkOrchestrator.initInfluxConfig(configuration);
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes;
import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import com.gotocompany.dagger.core.sink.influx.ErrorHandler;
import com.gotocompany.dagger.core.sink.influx.InfluxDBConfigurationParser;
import com.gotocompany.dagger.core.sink.influx.InfluxDBDatabaseConfig;
import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper;
import com.gotocompany.dagger.core.sink.influx.InfluxDBSink;
import com.gotocompany.dagger.core.utils.KafkaConfigUtil;
Expand Down Expand Up @@ -38,12 +40,21 @@
public class SinkOrchestrator implements TelemetryPublisher {
private final MetricsTelemetryExporter telemetryExporter;
private final Map<String, List<String>> metrics;
private InfluxDBConfigurationParser configParser;

public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) {
this.telemetryExporter = telemetryExporter;
this.metrics = new HashMap<>();
}

/**
* Initialize the multi-database InfluxDB configuration parser.
* Must be called before using {@link #getInfluxSink(Configuration, String[], String, String)}.
*/
public void initInfluxConfig(Configuration configuration) {
this.configParser = InfluxDBConfigurationParser.parse(configuration);
}

/**
* Gets sink.
*
Expand Down Expand Up @@ -98,6 +109,27 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl
return getSink(configuration, columnNames, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName);
}

/**
* Gets an InfluxDB sink targeting a specific named database and measurement.
* Requires {@link #initInfluxConfig(Configuration)} to have been called first.
*
* @param configuration the configuration (for general settings like useRowFieldNames)
* @param columnNames the column names for the stream
* @param databaseName the logical name of the target database (as defined in SINK_INFLUX_DATABASES_CONFIG)
* @param measurementName the InfluxDB measurement name
* @return the InfluxDB sink
*/
public Sink getInfluxSink(Configuration configuration, String[] columnNames, String databaseName, String measurementName) {
if (configParser == null) {
throw new IllegalStateException("InfluxDB config not initialized. Call initInfluxConfig() first.");
}
InfluxDBDatabaseConfig dbConfig = configParser.getDatabase(databaseName);
addMetric(TelemetryTypes.SINK_TYPE.getValue(), "influx");
Sink sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), dbConfig, configuration, columnNames, new ErrorHandler(), measurementName);
notifySubscriber();
return sink;
}

private void reportTelemetry(KafkaSerializerBuilder kafkaSchemaBuilder) {
TelemetryPublisher pub = (TelemetryPublisher) kafkaSchemaBuilder;
pub.addSubscriber(telemetryExporter);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.gotocompany.dagger.core.sink.influx;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import com.gotocompany.dagger.common.configuration.Configuration;
import com.gotocompany.dagger.core.utils.Constants;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Parses InfluxDB database configurations from either:
* 1. The new JSON array config key ({@code SINK_INFLUX_DATABASES_CONFIG}), or
* 2. The legacy flat keys ({@code SINK_INFLUX_URL}, {@code SINK_INFLUX_DB_NAME}, etc.)
* as a single "default" entry — for backward compatibility.
*/
public class InfluxDBConfigurationParser implements Serializable {
private static final long serialVersionUID = 1L;
private static final Gson GSON = new Gson();

private final Map<String, InfluxDBDatabaseConfig> configsByName;

private InfluxDBConfigurationParser(Map<String, InfluxDBDatabaseConfig> configsByName) {
this.configsByName = Collections.unmodifiableMap(configsByName);
}

/**
* Parse InfluxDB database configurations from the given Configuration.
* If {@code SINK_INFLUX_DATABASES_CONFIG} is set, parse it as a JSON array.
* Otherwise, fall back to legacy flat keys and create a single "default" entry.
*/
public static InfluxDBConfigurationParser parse(Configuration configuration) {
String jsonConfig = configuration.getString(
Constants.SINK_INFLUX_DATABASES_CONFIG_KEY,
Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT);

if (!Strings.isNullOrEmpty(jsonConfig)) {
return parseJsonConfig(jsonConfig);
}
return parseLegacyConfig(configuration);
}

private static InfluxDBConfigurationParser parseJsonConfig(String jsonConfig) {
Map<String, InfluxDBDatabaseConfig> configs = new LinkedHashMap<>();
try {
JsonArray array = GSON.fromJson(jsonConfig, JsonArray.class);
if (array == null || array.size() == 0) {
throw new IllegalArgumentException("SINK_INFLUX_DATABASES_CONFIG is empty or not a valid JSON array");
}
for (JsonElement element : array) {
JsonObject obj = element.getAsJsonObject();
String name = getRequiredString(obj, "name");
String url = getRequiredString(obj, "url");
String username = getStringOrDefault(obj, "username", "");
String password = getStringOrDefault(obj, "password", "");
String dbName = getRequiredString(obj, "dbName");
String retentionPolicy = getStringOrDefault(obj, "retentionPolicy", "");
int batchSize = getIntOrDefault(obj, "batchSize", 0);
int flushDurationMs = getIntOrDefault(obj, "flushDurationMs", 0);

InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig(
name, url, username, password, dbName, retentionPolicy, batchSize, flushDurationMs);

if (configs.containsKey(name)) {
throw new IllegalArgumentException("Duplicate database name in SINK_INFLUX_DATABASES_CONFIG: " + name);
}
configs.put(name, config);
}
} catch (JsonSyntaxException e) {
throw new IllegalArgumentException("Invalid JSON in SINK_INFLUX_DATABASES_CONFIG: " + e.getMessage(), e);
}
return new InfluxDBConfigurationParser(configs);
}

private static InfluxDBConfigurationParser parseLegacyConfig(Configuration configuration) {
String url = configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT);
String username = configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT);
String password = configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT);
String dbName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT);
String retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT);
int batchSize = configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT);
int flushDurationMs = configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT);

InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig(
InfluxDBDatabaseConfig.DEFAULT_NAME, url, username, password,
dbName, retentionPolicy, batchSize, flushDurationMs);

Map<String, InfluxDBDatabaseConfig> configs = new LinkedHashMap<>();
configs.put(InfluxDBDatabaseConfig.DEFAULT_NAME, config);
return new InfluxDBConfigurationParser(configs);
}

/**
* Get the configuration for a named database.
*
* @param name the logical database name
* @return the database configuration
* @throws IllegalArgumentException if no database with that name is configured
*/
public InfluxDBDatabaseConfig getDatabase(String name) {
InfluxDBDatabaseConfig config = configsByName.get(name);
if (config == null) {
throw new IllegalArgumentException("No InfluxDB database configured with name: " + name
+ ". Available: " + configsByName.keySet());
}
return config;
}

/**
* Get the default database configuration (used for backward compatibility).
*
* @return the default database configuration
* @throws IllegalArgumentException if no default database is configured
*/
public InfluxDBDatabaseConfig getDefaultDatabase() {
return getDatabase(InfluxDBDatabaseConfig.DEFAULT_NAME);
}

/**
* Check if a named database is configured.
*/
public boolean hasDatabase(String name) {
return configsByName.containsKey(name);
}

/**
* Get all configured database configs.
*/
public List<InfluxDBDatabaseConfig> getAllDatabases() {
return new ArrayList<>(configsByName.values());
}

/**
* Get the number of configured databases.
*/
public int size() {
return configsByName.size();
}

private static String getRequiredString(JsonObject obj, String field) {
if (!obj.has(field) || obj.get(field).isJsonNull()) {
throw new IllegalArgumentException("Missing required field '" + field + "' in SINK_INFLUX_DATABASES_CONFIG entry");
}
String value = obj.get(field).getAsString();
if (Strings.isNullOrEmpty(value)) {
throw new IllegalArgumentException("Field '" + field + "' must not be empty in SINK_INFLUX_DATABASES_CONFIG entry");
}
return value;
}

private static String getStringOrDefault(JsonObject obj, String field, String defaultValue) {
if (!obj.has(field) || obj.get(field).isJsonNull()) {
return defaultValue;
}
return obj.get(field).getAsString();
}

private static int getIntOrDefault(JsonObject obj, String field, int defaultValue) {
if (!obj.has(field) || obj.get(field).isJsonNull()) {
return defaultValue;
}
return obj.get(field).getAsInt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.gotocompany.dagger.core.sink.influx;

import org.influxdb.InfluxDB;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
* Manages InfluxDB connections keyed by database name.
* Lazily creates connections on first access and caches them.
* Connections are transient — after deserialization, they are re-created on demand.
*/
public class InfluxDBConnectionRegistry implements Serializable {
private static final long serialVersionUID = 1L;

private final Map<String, InfluxDBDatabaseConfig> configsByName;
private final InfluxDBFactoryWrapper influxDBFactory;
private transient Map<String, InfluxDB> connections;

public InfluxDBConnectionRegistry(List<InfluxDBDatabaseConfig> configs, InfluxDBFactoryWrapper influxDBFactory) {
this.influxDBFactory = influxDBFactory;
this.configsByName = new HashMap<>();
for (InfluxDBDatabaseConfig config : configs) {
this.configsByName.put(config.getName(), config);
}
}

/**
* Get or create a connection for the given database name.
* Batch mode is enabled using the database config's batch settings.
*
* @param databaseName the logical name of the database
* @param exceptionHandler handler for batch write exceptions
* @return the InfluxDB connection
*/
public synchronized InfluxDB getConnection(String databaseName, BiConsumer<Iterable<org.influxdb.dto.Point>, Throwable> exceptionHandler) {
ensureConnectionsMap();
InfluxDB connection = connections.get(databaseName);
if (connection != null) {
return connection;
}

InfluxDBDatabaseConfig config = configsByName.get(databaseName);
if (config == null) {
throw new IllegalArgumentException("No InfluxDB database configured with name: " + databaseName
+ ". Available: " + configsByName.keySet());
}

connection = influxDBFactory.connect(config.getUrl(), config.getUsername(), config.getPassword());
connection.enableBatch(config.getBatchSize(), config.getFlushDurationMs(),
TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), exceptionHandler);
connections.put(databaseName, connection);
return connection;
}

/**
* Get the configuration for a named database.
*/
public InfluxDBDatabaseConfig getConfig(String databaseName) {
InfluxDBDatabaseConfig config = configsByName.get(databaseName);
if (config == null) {
throw new IllegalArgumentException("No InfluxDB database configured with name: " + databaseName
+ ". Available: " + configsByName.keySet());
}
return config;
}

/**
* Check if a database is configured.
*/
public boolean hasDatabase(String databaseName) {
return configsByName.containsKey(databaseName);
}

/**
* Close all open connections.
*/
public synchronized void closeAll() {
if (connections != null) {
for (InfluxDB connection : connections.values()) {
try {
connection.close();
} catch (Exception e) {
// best-effort close
}
}
connections.clear();
}
}

private void ensureConnectionsMap() {
if (connections == null) {
connections = new HashMap<>();
}
}
}
Loading
Loading