diff --git a/CHANGELOG.md b/CHANGELOG.md index 723b7163e..e8bb6fffa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [Unreleased] + +### Features + +- **Multiple InfluxDB Database Support** — Route different measurements to different InfluxDB databases from a single Dagger job. New `getInfluxSink(databaseName, measurementName)` API on `SinkOrchestrator`. Configure multiple named databases via `SINK_INFLUX_DATABASES_CONFIG` JSON array. Fully backward compatible with existing single-database configuration. + ### [v0.9.0](https://github.com/goto/dagger/releases/tag/v0.9.0) (2023-03-16) ### Features diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java index 991ff4ac1..8743d8b79 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/DaggerSqlJobBuilder.java @@ -230,6 +230,7 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName) { private void addSink(StreamInfo streamInfo) { SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter); sinkOrchestrator.addSubscriber(telemetryExporter); + sinkOrchestrator.initInfluxConfig(configuration); streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter)); } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java index 49ba96b19..8cdd51157 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/SinkOrchestrator.java @@ -6,6 +6,8 @@ import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes; import com.gotocompany.dagger.core.sink.bigquery.BigQuerySinkBuilder; import com.gotocompany.dagger.core.sink.influx.ErrorHandler; +import com.gotocompany.dagger.core.sink.influx.InfluxDBConfigurationParser; +import com.gotocompany.dagger.core.sink.influx.InfluxDBDatabaseConfig; import com.gotocompany.dagger.core.sink.influx.InfluxDBFactoryWrapper; import com.gotocompany.dagger.core.sink.influx.InfluxDBSink; import com.gotocompany.dagger.core.utils.KafkaConfigUtil; @@ -38,12 +40,21 @@ public class SinkOrchestrator implements TelemetryPublisher { private final MetricsTelemetryExporter telemetryExporter; private final Map> metrics; + private InfluxDBConfigurationParser configParser; public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) { this.telemetryExporter = telemetryExporter; this.metrics = new HashMap<>(); } + /** + * Initialize the multi-database InfluxDB configuration parser. + * Must be called before using {@link #getInfluxSink(Configuration, String[], String, String)}. + */ + public void initInfluxConfig(Configuration configuration) { + this.configParser = InfluxDBConfigurationParser.parse(configuration); + } + /** * Gets sink. * @@ -98,6 +109,27 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl return getSink(configuration, columnNames, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); } + /** + * Gets an InfluxDB sink targeting a specific named database and measurement. + * Requires {@link #initInfluxConfig(Configuration)} to have been called first. + * + * @param configuration the configuration (for general settings like useRowFieldNames) + * @param columnNames the column names for the stream + * @param databaseName the logical name of the target database (as defined in SINK_INFLUX_DATABASES_CONFIG) + * @param measurementName the InfluxDB measurement name + * @return the InfluxDB sink + */ + public Sink getInfluxSink(Configuration configuration, String[] columnNames, String databaseName, String measurementName) { + if (configParser == null) { + throw new IllegalStateException("InfluxDB config not initialized. Call initInfluxConfig() first."); + } + InfluxDBDatabaseConfig dbConfig = configParser.getDatabase(databaseName); + addMetric(TelemetryTypes.SINK_TYPE.getValue(), "influx"); + Sink sink = new InfluxDBSink(new InfluxDBFactoryWrapper(), dbConfig, configuration, columnNames, new ErrorHandler(), measurementName); + notifySubscriber(); + return sink; + } + private void reportTelemetry(KafkaSerializerBuilder kafkaSchemaBuilder) { TelemetryPublisher pub = (TelemetryPublisher) kafkaSchemaBuilder; pub.addSubscriber(telemetryExporter); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParser.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParser.java new file mode 100644 index 000000000..bd7b893ca --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParser.java @@ -0,0 +1,172 @@ +package com.gotocompany.dagger.core.sink.influx; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.utils.Constants; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Parses InfluxDB database configurations from either: + * 1. The new JSON array config key ({@code SINK_INFLUX_DATABASES_CONFIG}), or + * 2. The legacy flat keys ({@code SINK_INFLUX_URL}, {@code SINK_INFLUX_DB_NAME}, etc.) + * as a single "default" entry — for backward compatibility. + */ +public class InfluxDBConfigurationParser implements Serializable { + private static final long serialVersionUID = 1L; + private static final Gson GSON = new Gson(); + + private final Map configsByName; + + private InfluxDBConfigurationParser(Map configsByName) { + this.configsByName = Collections.unmodifiableMap(configsByName); + } + + /** + * Parse InfluxDB database configurations from the given Configuration. + * If {@code SINK_INFLUX_DATABASES_CONFIG} is set, parse it as a JSON array. + * Otherwise, fall back to legacy flat keys and create a single "default" entry. + */ + public static InfluxDBConfigurationParser parse(Configuration configuration) { + String jsonConfig = configuration.getString( + Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, + Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT); + + if (!Strings.isNullOrEmpty(jsonConfig)) { + return parseJsonConfig(jsonConfig); + } + return parseLegacyConfig(configuration); + } + + private static InfluxDBConfigurationParser parseJsonConfig(String jsonConfig) { + Map configs = new LinkedHashMap<>(); + try { + JsonArray array = GSON.fromJson(jsonConfig, JsonArray.class); + if (array == null || array.size() == 0) { + throw new IllegalArgumentException("SINK_INFLUX_DATABASES_CONFIG is empty or not a valid JSON array"); + } + for (JsonElement element : array) { + JsonObject obj = element.getAsJsonObject(); + String name = getRequiredString(obj, "name"); + String url = getRequiredString(obj, "url"); + String username = getStringOrDefault(obj, "username", ""); + String password = getStringOrDefault(obj, "password", ""); + String dbName = getRequiredString(obj, "dbName"); + String retentionPolicy = getStringOrDefault(obj, "retentionPolicy", ""); + int batchSize = getIntOrDefault(obj, "batchSize", 0); + int flushDurationMs = getIntOrDefault(obj, "flushDurationMs", 0); + + InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig( + name, url, username, password, dbName, retentionPolicy, batchSize, flushDurationMs); + + if (configs.containsKey(name)) { + throw new IllegalArgumentException("Duplicate database name in SINK_INFLUX_DATABASES_CONFIG: " + name); + } + configs.put(name, config); + } + } catch (JsonSyntaxException e) { + throw new IllegalArgumentException("Invalid JSON in SINK_INFLUX_DATABASES_CONFIG: " + e.getMessage(), e); + } + return new InfluxDBConfigurationParser(configs); + } + + private static InfluxDBConfigurationParser parseLegacyConfig(Configuration configuration) { + String url = configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT); + String username = configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT); + String password = configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT); + String dbName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT); + String retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT); + int batchSize = configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT); + int flushDurationMs = configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT); + + InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig( + InfluxDBDatabaseConfig.DEFAULT_NAME, url, username, password, + dbName, retentionPolicy, batchSize, flushDurationMs); + + Map configs = new LinkedHashMap<>(); + configs.put(InfluxDBDatabaseConfig.DEFAULT_NAME, config); + return new InfluxDBConfigurationParser(configs); + } + + /** + * Get the configuration for a named database. + * + * @param name the logical database name + * @return the database configuration + * @throws IllegalArgumentException if no database with that name is configured + */ + public InfluxDBDatabaseConfig getDatabase(String name) { + InfluxDBDatabaseConfig config = configsByName.get(name); + if (config == null) { + throw new IllegalArgumentException("No InfluxDB database configured with name: " + name + + ". Available: " + configsByName.keySet()); + } + return config; + } + + /** + * Get the default database configuration (used for backward compatibility). + * + * @return the default database configuration + * @throws IllegalArgumentException if no default database is configured + */ + public InfluxDBDatabaseConfig getDefaultDatabase() { + return getDatabase(InfluxDBDatabaseConfig.DEFAULT_NAME); + } + + /** + * Check if a named database is configured. + */ + public boolean hasDatabase(String name) { + return configsByName.containsKey(name); + } + + /** + * Get all configured database configs. + */ + public List getAllDatabases() { + return new ArrayList<>(configsByName.values()); + } + + /** + * Get the number of configured databases. + */ + public int size() { + return configsByName.size(); + } + + private static String getRequiredString(JsonObject obj, String field) { + if (!obj.has(field) || obj.get(field).isJsonNull()) { + throw new IllegalArgumentException("Missing required field '" + field + "' in SINK_INFLUX_DATABASES_CONFIG entry"); + } + String value = obj.get(field).getAsString(); + if (Strings.isNullOrEmpty(value)) { + throw new IllegalArgumentException("Field '" + field + "' must not be empty in SINK_INFLUX_DATABASES_CONFIG entry"); + } + return value; + } + + private static String getStringOrDefault(JsonObject obj, String field, String defaultValue) { + if (!obj.has(field) || obj.get(field).isJsonNull()) { + return defaultValue; + } + return obj.get(field).getAsString(); + } + + private static int getIntOrDefault(JsonObject obj, String field, int defaultValue) { + if (!obj.has(field) || obj.get(field).isJsonNull()) { + return defaultValue; + } + return obj.get(field).getAsInt(); + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistry.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistry.java new file mode 100644 index 000000000..83bc7d298 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistry.java @@ -0,0 +1,101 @@ +package com.gotocompany.dagger.core.sink.influx; + +import org.influxdb.InfluxDB; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +/** + * Manages InfluxDB connections keyed by database name. + * Lazily creates connections on first access and caches them. + * Connections are transient — after deserialization, they are re-created on demand. + */ +public class InfluxDBConnectionRegistry implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map configsByName; + private final InfluxDBFactoryWrapper influxDBFactory; + private transient Map connections; + + public InfluxDBConnectionRegistry(List configs, InfluxDBFactoryWrapper influxDBFactory) { + this.influxDBFactory = influxDBFactory; + this.configsByName = new HashMap<>(); + for (InfluxDBDatabaseConfig config : configs) { + this.configsByName.put(config.getName(), config); + } + } + + /** + * Get or create a connection for the given database name. + * Batch mode is enabled using the database config's batch settings. + * + * @param databaseName the logical name of the database + * @param exceptionHandler handler for batch write exceptions + * @return the InfluxDB connection + */ + public synchronized InfluxDB getConnection(String databaseName, BiConsumer, Throwable> exceptionHandler) { + ensureConnectionsMap(); + InfluxDB connection = connections.get(databaseName); + if (connection != null) { + return connection; + } + + InfluxDBDatabaseConfig config = configsByName.get(databaseName); + if (config == null) { + throw new IllegalArgumentException("No InfluxDB database configured with name: " + databaseName + + ". Available: " + configsByName.keySet()); + } + + connection = influxDBFactory.connect(config.getUrl(), config.getUsername(), config.getPassword()); + connection.enableBatch(config.getBatchSize(), config.getFlushDurationMs(), + TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), exceptionHandler); + connections.put(databaseName, connection); + return connection; + } + + /** + * Get the configuration for a named database. + */ + public InfluxDBDatabaseConfig getConfig(String databaseName) { + InfluxDBDatabaseConfig config = configsByName.get(databaseName); + if (config == null) { + throw new IllegalArgumentException("No InfluxDB database configured with name: " + databaseName + + ". Available: " + configsByName.keySet()); + } + return config; + } + + /** + * Check if a database is configured. + */ + public boolean hasDatabase(String databaseName) { + return configsByName.containsKey(databaseName); + } + + /** + * Close all open connections. + */ + public synchronized void closeAll() { + if (connections != null) { + for (InfluxDB connection : connections.values()) { + try { + connection.close(); + } catch (Exception e) { + // best-effort close + } + } + connections.clear(); + } + } + + private void ensureConnectionsMap() { + if (connections == null) { + connections = new HashMap<>(); + } + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfig.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfig.java new file mode 100644 index 000000000..9fd819ce6 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfig.java @@ -0,0 +1,99 @@ +package com.gotocompany.dagger.core.sink.influx; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Configuration for a single InfluxDB database target. + * Each instance represents a named database that sinks can write to. + */ +public class InfluxDBDatabaseConfig implements Serializable { + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_NAME = "default"; + + private final String name; + private final String url; + private final String username; + private final String password; + private final String dbName; + private final String retentionPolicy; + private final int batchSize; + private final int flushDurationMs; + + public InfluxDBDatabaseConfig(String name, String url, String username, String password, + String dbName, String retentionPolicy, int batchSize, int flushDurationMs) { + this.name = Objects.requireNonNull(name, "name must not be null"); + this.url = Objects.requireNonNull(url, "url must not be null"); + this.username = Objects.requireNonNull(username, "username must not be null"); + this.password = Objects.requireNonNull(password, "password must not be null"); + this.dbName = Objects.requireNonNull(dbName, "dbName must not be null"); + this.retentionPolicy = Objects.requireNonNull(retentionPolicy, "retentionPolicy must not be null"); + this.batchSize = batchSize; + this.flushDurationMs = flushDurationMs; + } + + public String getName() { + return name; + } + + public String getUrl() { + return url; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getDbName() { + return dbName; + } + + public String getRetentionPolicy() { + return retentionPolicy; + } + + public int getBatchSize() { + return batchSize; + } + + public int getFlushDurationMs() { + return flushDurationMs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InfluxDBDatabaseConfig that = (InfluxDBDatabaseConfig) o; + return batchSize == that.batchSize + && flushDurationMs == that.flushDurationMs + && Objects.equals(name, that.name) + && Objects.equals(url, that.url) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(dbName, that.dbName) + && Objects.equals(retentionPolicy, that.retentionPolicy); + } + + @Override + public int hashCode() { + return Objects.hash(name, url, username, password, dbName, retentionPolicy, batchSize, flushDurationMs); + } + + @Override + public String toString() { + return "InfluxDBDatabaseConfig{" + + "name='" + name + '\'' + + ", url='" + url + '\'' + + ", dbName='" + dbName + '\'' + + ", retentionPolicy='" + retentionPolicy + '\'' + + ", batchSize=" + batchSize + + ", flushDurationMs=" + flushDurationMs + + '}'; + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java index d5307b005..928ba9b19 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBSink.java @@ -26,7 +26,11 @@ public class InfluxDBSink implements Sink { private ErrorHandler errorHandler; private ErrorReporter errorReporter; private final String influxMeasurementOverrideName; + private final InfluxDBDatabaseConfig databaseConfig; + /** + * Legacy constructor — reads connection settings from flat config keys. + */ public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration configuration, String[] columnNames, ErrorHandler errorHandler, String influxMeasurementOverrideName) { this.influxDBFactory = influxDBFactory; @@ -34,22 +38,45 @@ public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, Configuration config this.columnNames = columnNames; this.errorHandler = errorHandler; this.influxMeasurementOverrideName = influxMeasurementOverrideName; + this.databaseConfig = null; + } + + /** + * Multi-DB constructor — uses explicit database config for connection and write settings. + */ + public InfluxDBSink(InfluxDBFactoryWrapper influxDBFactory, InfluxDBDatabaseConfig databaseConfig, + Configuration configuration, String[] columnNames, + ErrorHandler errorHandler, String influxMeasurementOverrideName) { + this.influxDBFactory = influxDBFactory; + this.configuration = configuration; + this.columnNames = columnNames; + this.errorHandler = errorHandler; + this.influxMeasurementOverrideName = influxMeasurementOverrideName; + this.databaseConfig = databaseConfig; } @Override public SinkWriter createWriter(InitContext context, List states) throws IOException { - InfluxDB influxDB = influxDBFactory.connect(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT), - configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT), - configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)); - errorHandler.init(context); - influxDB.enableBatch(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT), - configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT), - TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), errorHandler.getExceptionHandler()); + InfluxDB influxDB; + if (databaseConfig != null) { + influxDB = influxDBFactory.connect(databaseConfig.getUrl(), databaseConfig.getUsername(), databaseConfig.getPassword()); + errorHandler.init(context); + influxDB.enableBatch(databaseConfig.getBatchSize(), databaseConfig.getFlushDurationMs(), + TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), errorHandler.getExceptionHandler()); + } else { + influxDB = influxDBFactory.connect(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT), + configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT), + configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)); + errorHandler.init(context); + influxDB.enableBatch(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT), + configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT), + TimeUnit.MILLISECONDS, Executors.defaultThreadFactory(), errorHandler.getExceptionHandler()); + } if (errorReporter == null) { errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration); } - InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, influxDB, columnNames, errorHandler, errorReporter, influxMeasurementOverrideName); + InfluxDBWriter influxDBWriter = new InfluxDBWriter(configuration, databaseConfig, influxDB, columnNames, errorHandler, errorReporter, influxMeasurementOverrideName); return influxDBWriter; } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java index 578e06ea4..878d882b4 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java @@ -36,10 +36,15 @@ public class InfluxDBWriter implements SinkWriter { private ErrorReporter errorReporter; private boolean useRowFieldNames; - public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, - ErrorReporter errorReporter, String influxMeasurementOverrideName) { - databaseName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT); - retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT); + public InfluxDBWriter(Configuration configuration, InfluxDBDatabaseConfig databaseConfig, InfluxDB influxDB, String[] columnNames, + ErrorHandler errorHandler, ErrorReporter errorReporter, String influxMeasurementOverrideName) { + if (databaseConfig != null) { + databaseName = databaseConfig.getDbName(); + retentionPolicy = databaseConfig.getRetentionPolicy(); + } else { + databaseName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT); + retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT); + } if (Strings.isNullOrEmpty(influxMeasurementOverrideName)) { measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT); } else { @@ -52,6 +57,14 @@ public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] c this.errorReporter = errorReporter; } + /** + * Legacy constructor — reads database/retention from flat config keys. + */ + public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, + ErrorReporter errorReporter, String influxMeasurementOverrideName) { + this(configuration, null, influxDB, columnNames, errorHandler, errorReporter, influxMeasurementOverrideName); + } + @Override public void write(Row row, Context context) throws IOException, InterruptedException { LOGGER.info("row to influx: " + row); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index c92a66b6e..b59d86e35 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -163,6 +163,15 @@ public class Constants { // If measurement names need to change, they can now be updated through configuration without changing the code. public static final String SINK_INFLUX_MEASUREMENTS_LIST_KEY = "SINK_INFLUX_MEASUREMENTS_LIST"; + // Comma-separated list of database names (from SINK_INFLUX_DATABASES_CONFIG) corresponding + // to each entry in SINK_INFLUX_MEASUREMENTS_LIST. Defaults to "default" for all entries. + public static final String SINK_INFLUX_DB_NAMES_LIST_KEY = "SINK_INFLUX_DB_NAMES_LIST"; + + // JSON array of InfluxDB database configurations for multi-database support. + // When absent, falls back to the legacy flat SINK_INFLUX_* keys. + public static final String SINK_INFLUX_DATABASES_CONFIG_KEY = "SINK_INFLUX_DATABASES_CONFIG"; + public static final String SINK_INFLUX_DATABASES_CONFIG_DEFAULT = ""; + public static final String SINK_INFLUX_URL_KEY = "SINK_INFLUX_URL"; public static final String SINK_INFLUX_URL_DEFAULT = ""; public static final String SINK_INFLUX_USERNAME_KEY = "SINK_INFLUX_USERNAME"; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java index 277aac960..7068bba16 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/SinkOrchestratorTest.java @@ -120,4 +120,29 @@ public void shouldReturnBigQuerySink() { Sink sinkFunction = sinkOrchestrator.getSink(configuration, new String[]{}, stencilClientOrchestrator, daggerStatsDReporter, influxMeasurementOverrideName); assertThat(sinkFunction, instanceOf(BigQuerySink.class)); } + + @Test + public void shouldReturnInfluxSinkForNamedDatabase() { + String json = "[{\"name\":\"ht-db\",\"url\":\"http://host1:8086\",\"dbName\":\"metrics_ht\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + sinkOrchestrator.initInfluxConfig(configuration); + Sink sink = sinkOrchestrator.getInfluxSink(configuration, new String[]{}, "ht-db", "my-measurement"); + + assertThat(sink, instanceOf(InfluxDBSink.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenGettingInfluxSinkForUnknownDatabase() { + String json = "[{\"name\":\"ht-db\",\"url\":\"http://host1:8086\",\"dbName\":\"metrics_ht\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + sinkOrchestrator.initInfluxConfig(configuration); + sinkOrchestrator.getInfluxSink(configuration, new String[]{}, "nonexistent", "my-measurement"); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowWhenInfluxConfigNotInitialized() { + sinkOrchestrator.getInfluxSink(configuration, new String[]{}, "ht-db", "my-measurement"); + } } diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParserTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParserTest.java new file mode 100644 index 000000000..0b2182672 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConfigurationParserTest.java @@ -0,0 +1,226 @@ +package com.gotocompany.dagger.core.sink.influx; + +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.utils.Constants; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class InfluxDBConfigurationParserTest { + + @Mock + private Configuration configuration; + + @Before + public void setUp() { + initMocks(this); + } + + // --- Legacy fallback tests --- + + @Test + public void shouldFallBackToLegacyKeysWhenDatabasesConfigIsAbsent() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT)).thenReturn("http://localhost:8086"); + when(configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT)).thenReturn("admin"); + when(configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)).thenReturn("secret"); + when(configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT)).thenReturn("mydb"); + when(configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT)).thenReturn("autogen"); + when(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT)).thenReturn(100); + when(configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT)).thenReturn(500); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + + assertEquals(1, parser.size()); + assertTrue(parser.hasDatabase("default")); + + InfluxDBDatabaseConfig config = parser.getDefaultDatabase(); + assertEquals("default", config.getName()); + assertEquals("http://localhost:8086", config.getUrl()); + assertEquals("admin", config.getUsername()); + assertEquals("secret", config.getPassword()); + assertEquals("mydb", config.getDbName()); + assertEquals("autogen", config.getRetentionPolicy()); + assertEquals(100, config.getBatchSize()); + assertEquals(500, config.getFlushDurationMs()); + } + + @Test + public void shouldFallBackToLegacyKeysWithDefaults() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT)).thenReturn(""); + when(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT)).thenReturn(0); + when(configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT)).thenReturn(0); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + + assertEquals(1, parser.size()); + InfluxDBDatabaseConfig config = parser.getDefaultDatabase(); + assertEquals("", config.getUrl()); + assertEquals("", config.getDbName()); + assertEquals(0, config.getBatchSize()); + } + + // --- JSON config tests --- + + @Test + public void shouldParseSingleDatabaseFromJson() { + String json = "[{\"name\":\"db1\",\"url\":\"http://host1:8086\",\"username\":\"u1\",\"password\":\"p1\"," + + "\"dbName\":\"metrics1\",\"retentionPolicy\":\"autogen\",\"batchSize\":200,\"flushDurationMs\":1000}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + + assertEquals(1, parser.size()); + assertTrue(parser.hasDatabase("db1")); + + InfluxDBDatabaseConfig config = parser.getDatabase("db1"); + assertEquals("db1", config.getName()); + assertEquals("http://host1:8086", config.getUrl()); + assertEquals("u1", config.getUsername()); + assertEquals("p1", config.getPassword()); + assertEquals("metrics1", config.getDbName()); + assertEquals("autogen", config.getRetentionPolicy()); + assertEquals(200, config.getBatchSize()); + assertEquals(1000, config.getFlushDurationMs()); + } + + @Test + public void shouldParseMultipleDatabasesFromJson() { + String json = "[" + + "{\"name\":\"ht-db\",\"url\":\"http://host1:8086\",\"username\":\"u1\",\"password\":\"p1\"," + + "\"dbName\":\"metrics_ht\",\"retentionPolicy\":\"rp1\",\"batchSize\":1000,\"flushDurationMs\":500}," + + "{\"name\":\"lt-db\",\"url\":\"http://host2:8086\",\"username\":\"u2\",\"password\":\"p2\"," + + "\"dbName\":\"metrics_lt\",\"retentionPolicy\":\"rp2\",\"batchSize\":100,\"flushDurationMs\":2000}" + + "]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + + assertEquals(2, parser.size()); + assertTrue(parser.hasDatabase("ht-db")); + assertTrue(parser.hasDatabase("lt-db")); + + assertEquals("http://host1:8086", parser.getDatabase("ht-db").getUrl()); + assertEquals("metrics_ht", parser.getDatabase("ht-db").getDbName()); + assertEquals("http://host2:8086", parser.getDatabase("lt-db").getUrl()); + assertEquals("metrics_lt", parser.getDatabase("lt-db").getDbName()); + } + + @Test + public void shouldUseDefaultsForOptionalJsonFields() { + String json = "[{\"name\":\"db1\",\"url\":\"http://host1:8086\",\"dbName\":\"mydb\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + InfluxDBDatabaseConfig config = parser.getDatabase("db1"); + + assertEquals("", config.getUsername()); + assertEquals("", config.getPassword()); + assertEquals("", config.getRetentionPolicy()); + assertEquals(0, config.getBatchSize()); + assertEquals(0, config.getFlushDurationMs()); + } + + @Test + public void shouldReturnAllDatabases() { + String json = "[" + + "{\"name\":\"db1\",\"url\":\"http://h1:8086\",\"dbName\":\"d1\"}," + + "{\"name\":\"db2\",\"url\":\"http://h2:8086\",\"dbName\":\"d2\"}" + + "]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + + assertEquals(2, parser.getAllDatabases().size()); + } + + // --- Error cases --- + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnDuplicateDatabaseNames() { + String json = "[" + + "{\"name\":\"db1\",\"url\":\"http://h1:8086\",\"dbName\":\"d1\"}," + + "{\"name\":\"db1\",\"url\":\"http://h2:8086\",\"dbName\":\"d2\"}" + + "]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnMissingRequiredNameField() { + String json = "[{\"url\":\"http://h1:8086\",\"dbName\":\"d1\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnMissingRequiredUrlField() { + String json = "[{\"name\":\"db1\",\"dbName\":\"d1\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnMissingRequiredDbNameField() { + String json = "[{\"name\":\"db1\",\"url\":\"http://h1:8086\"}]"; + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(json); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnInvalidJson() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn("not-json"); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnEmptyJsonArray() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn("[]"); + + InfluxDBConfigurationParser.parse(configuration); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenGettingNonExistentDatabase() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT)).thenReturn("http://localhost:8086"); + when(configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT)).thenReturn("mydb"); + when(configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT)).thenReturn(""); + when(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT)).thenReturn(0); + when(configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT)).thenReturn(0); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + parser.getDatabase("nonexistent"); + } + + @Test + public void shouldReportHasDatabaseFalseForNonExistent() { + when(configuration.getString(Constants.SINK_INFLUX_DATABASES_CONFIG_KEY, Constants.SINK_INFLUX_DATABASES_CONFIG_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_URL_KEY, Constants.SINK_INFLUX_URL_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_USERNAME_KEY, Constants.SINK_INFLUX_USERNAME_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_PASSWORD_KEY, Constants.SINK_INFLUX_PASSWORD_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT)).thenReturn(""); + when(configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT)).thenReturn(""); + when(configuration.getInteger(Constants.SINK_INFLUX_BATCH_SIZE_KEY, Constants.SINK_INFLUX_BATCH_SIZE_DEFAULT)).thenReturn(0); + when(configuration.getInteger(Constants.SINK_INFLUX_FLUSH_DURATION_MS_KEY, Constants.SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT)).thenReturn(0); + + InfluxDBConfigurationParser parser = InfluxDBConfigurationParser.parse(configuration); + assertFalse(parser.hasDatabase("nonexistent")); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistryTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistryTest.java new file mode 100644 index 000000000..1654fec27 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBConnectionRegistryTest.java @@ -0,0 +1,154 @@ +package com.gotocompany.dagger.core.sink.influx; + +import org.influxdb.InfluxDB; +import org.influxdb.dto.Point; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class InfluxDBConnectionRegistryTest { + + @Mock + private InfluxDBFactoryWrapper influxDBFactory; + + @Mock + private InfluxDB influxDB1; + + @Mock + private InfluxDB influxDB2; + + private BiConsumer, Throwable> exceptionHandler; + + private InfluxDBDatabaseConfig config1; + private InfluxDBDatabaseConfig config2; + + @Before + public void setUp() { + initMocks(this); + exceptionHandler = (points, throwable) -> { }; + + config1 = new InfluxDBDatabaseConfig("db1", "http://host1:8086", "u1", "p1", + "metrics1", "autogen", 100, 500); + config2 = new InfluxDBDatabaseConfig("db2", "http://host2:8086", "u2", "p2", + "metrics2", "rp2", 200, 1000); + + when(influxDBFactory.connect("http://host1:8086", "u1", "p1")).thenReturn(influxDB1); + when(influxDBFactory.connect("http://host2:8086", "u2", "p2")).thenReturn(influxDB2); + } + + @Test + public void shouldCreateConnectionOnFirstAccess() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + InfluxDB conn = registry.getConnection("db1", exceptionHandler); + + assertSame(influxDB1, conn); + verify(influxDBFactory).connect("http://host1:8086", "u1", "p1"); + } + + @Test + public void shouldEnableBatchModeOnConnection() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + registry.getConnection("db1", exceptionHandler); + + verify(influxDB1).enableBatch(eq(100), eq(500), eq(TimeUnit.MILLISECONDS), + any(ThreadFactory.class), eq(exceptionHandler)); + } + + @Test + public void shouldReuseConnectionOnSubsequentAccess() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + InfluxDB first = registry.getConnection("db1", exceptionHandler); + InfluxDB second = registry.getConnection("db1", exceptionHandler); + + assertSame(first, second); + verify(influxDBFactory, times(1)).connect(anyString(), anyString(), anyString()); + } + + @Test + public void shouldCreateSeparateConnectionsForDifferentDatabases() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Arrays.asList(config1, config2), influxDBFactory); + + InfluxDB conn1 = registry.getConnection("db1", exceptionHandler); + InfluxDB conn2 = registry.getConnection("db2", exceptionHandler); + + assertSame(influxDB1, conn1); + assertSame(influxDB2, conn2); + assertNotSame(conn1, conn2); + verify(influxDBFactory, times(2)).connect(anyString(), anyString(), anyString()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenGettingConnectionForUnknownDatabase() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + registry.getConnection("nonexistent", exceptionHandler); + } + + @Test + public void shouldCloseAllConnections() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Arrays.asList(config1, config2), influxDBFactory); + + registry.getConnection("db1", exceptionHandler); + registry.getConnection("db2", exceptionHandler); + + registry.closeAll(); + + verify(influxDB1).close(); + verify(influxDB2).close(); + } + + @Test + public void shouldHandleCloseAllWhenNoConnectionsOpen() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + // Should not throw + registry.closeAll(); + } + + @Test + public void shouldReturnConfigForDatabase() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Arrays.asList(config1, config2), influxDBFactory); + + assertEquals(config1, registry.getConfig("db1")); + assertEquals(config2, registry.getConfig("db2")); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowWhenGettingConfigForUnknownDatabase() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + registry.getConfig("nonexistent"); + } + + @Test + public void shouldReportHasDatabase() { + InfluxDBConnectionRegistry registry = new InfluxDBConnectionRegistry( + Collections.singletonList(config1), influxDBFactory); + + assertTrue(registry.hasDatabase("db1")); + assertFalse(registry.hasDatabase("db2")); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfigTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfigTest.java new file mode 100644 index 000000000..eecb447a9 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBDatabaseConfigTest.java @@ -0,0 +1,117 @@ +package com.gotocompany.dagger.core.sink.influx; + +import org.junit.Test; + +import java.io.*; + +import static org.junit.Assert.*; + +public class InfluxDBDatabaseConfigTest { + + @Test + public void shouldCreateConfigWithAllFields() { + InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig( + "mydb", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + + assertEquals("mydb", config.getName()); + assertEquals("http://localhost:8086", config.getUrl()); + assertEquals("user", config.getUsername()); + assertEquals("pass", config.getPassword()); + assertEquals("metrics", config.getDbName()); + assertEquals("autogen", config.getRetentionPolicy()); + assertEquals(100, config.getBatchSize()); + assertEquals(500, config.getFlushDurationMs()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowWhenNameIsNull() { + new InfluxDBDatabaseConfig(null, "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowWhenUrlIsNull() { + new InfluxDBDatabaseConfig("mydb", null, "user", "pass", + "metrics", "autogen", 100, 500); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowWhenDbNameIsNull() { + new InfluxDBDatabaseConfig("mydb", "http://localhost:8086", "user", "pass", + null, "autogen", 100, 500); + } + + @Test + public void shouldBeEqualForSameValues() { + InfluxDBDatabaseConfig config1 = new InfluxDBDatabaseConfig( + "mydb", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + InfluxDBDatabaseConfig config2 = new InfluxDBDatabaseConfig( + "mydb", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + + assertEquals(config1, config2); + assertEquals(config1.hashCode(), config2.hashCode()); + } + + @Test + public void shouldNotBeEqualForDifferentNames() { + InfluxDBDatabaseConfig config1 = new InfluxDBDatabaseConfig( + "mydb1", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + InfluxDBDatabaseConfig config2 = new InfluxDBDatabaseConfig( + "mydb2", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + + assertNotEquals(config1, config2); + } + + @Test + public void shouldNotBeEqualForDifferentUrls() { + InfluxDBDatabaseConfig config1 = new InfluxDBDatabaseConfig( + "mydb", "http://host1:8086", "user", "pass", + "metrics", "autogen", 100, 500); + InfluxDBDatabaseConfig config2 = new InfluxDBDatabaseConfig( + "mydb", "http://host2:8086", "user", "pass", + "metrics", "autogen", 100, 500); + + assertNotEquals(config1, config2); + } + + @Test + public void shouldBeSerializable() throws Exception { + InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig( + "mydb", "http://localhost:8086", "user", "pass", + "metrics", "autogen", 100, 500); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(config); + oos.close(); + + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + InfluxDBDatabaseConfig deserialized = (InfluxDBDatabaseConfig) ois.readObject(); + ois.close(); + + assertEquals(config, deserialized); + } + + @Test + public void shouldHaveDefaultNameConstant() { + assertEquals("default", InfluxDBDatabaseConfig.DEFAULT_NAME); + } + + @Test + public void shouldNotIncludeCredentialsInToString() { + InfluxDBDatabaseConfig config = new InfluxDBDatabaseConfig( + "mydb", "http://localhost:8086", "secretuser", "secretpass", + "metrics", "autogen", 100, 500); + + String str = config.toString(); + assertTrue(str.contains("mydb")); + assertTrue(str.contains("http://localhost:8086")); + assertFalse(str.contains("secretuser")); + assertFalse(str.contains("secretpass")); + } +} diff --git a/docs/docs/guides/multi_influx_databases.md b/docs/docs/guides/multi_influx_databases.md new file mode 100644 index 000000000..369a9f6f3 --- /dev/null +++ b/docs/docs/guides/multi_influx_databases.md @@ -0,0 +1,93 @@ +# Multiple InfluxDB Databases + +This guide explains how to configure a Dagger job to write metrics to **multiple InfluxDB databases** from a single job. + +## Use Case + +In a real-time metrics job, different metrics may have vastly different throughput levels. For example, live status metrics may generate millions of points per minute, while daily statistics aggregate to a few thousand. Writing all metrics to a single InfluxDB database can cause storage and performance issues. + +With multi-database support, you can route high-throughput metrics to a dedicated database and low-throughput metrics to a shared database, each with its own connection settings and retention policies. + +## Configuration + +### Step 1: Define Named Databases + +Set the `SINK_INFLUX_DATABASES_CONFIG` environment variable to a JSON array of database configurations: + +```json +[ + { + "name": "high-throughput-db", + "url": "http://influx-ht:8086", + "username": "admin", + "password": "secret", + "dbName": "metrics_live", + "retentionPolicy": "autogen", + "batchSize": 2000, + "flushDurationMs": 500 + }, + { + "name": "low-throughput-db", + "url": "http://influx-lt:8086", + "username": "admin", + "password": "secret", + "dbName": "metrics_daily", + "retentionPolicy": "autogen", + "batchSize": 500, + "flushDurationMs": 1000 + } +] +``` + +Each entry has the following fields: + +| Field | Required | Default | Description | +|-------|----------|---------|-------------| +| `name` | Yes | - | Logical identifier used in the API | +| `url` | Yes | - | InfluxDB HTTP endpoint | +| `dbName` | Yes | - | InfluxDB database name | +| `username` | No | `""` | Authentication username | +| `password` | No | `""` | Authentication password | +| `retentionPolicy` | No | `"autogen"` | InfluxDB retention policy | +| `batchSize` | No | `100` | Number of points per batch write | +| `flushDurationMs` | No | `1000` | Flush interval in milliseconds | + +### Step 2: Map Measurements to Databases (Custom Jobs) + +For custom job builders like `DriversMetricJobBuilder`, set `SINK_INFLUX_DB_NAMES_LIST` as a comma-separated list of database names that map 1:1 to `SINK_INFLUX_MEASUREMENTS_LIST`: + +``` +SINK_INFLUX_MEASUREMENTS_LIST=rt-supply-lstatus-01,rt-supply-lpref-01,rt-supply-dstats-01,rt-supply-hmap-01 +SINK_INFLUX_DB_NAMES_LIST=high-throughput-db,high-throughput-db,low-throughput-db,high-throughput-db +``` + +This routes `rt-supply-dstats-01` to `low-throughput-db` while all live metrics go to `high-throughput-db`. + +## Developer API + +Use the `getInfluxSink` method on `SinkOrchestrator` to target a specific database: + +```java +SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter); +sinkOrchestrator.initInfluxConfig(configuration); + +// Route to a specific database +dataStream.sinkTo(sinkOrchestrator.getInfluxSink( + configuration, columnNames, "high-throughput-db", "measurement-name")); +``` + +The existing `getSink()` method continues to work unchanged for backward compatibility. + +## Backward Compatibility + +If `SINK_INFLUX_DATABASES_CONFIG` is **not set**, the system automatically falls back to the legacy flat configuration keys: + +- `SINK_INFLUX_URL` +- `SINK_INFLUX_USERNAME` +- `SINK_INFLUX_PASSWORD` +- `SINK_INFLUX_DB_NAME` +- `SINK_INFLUX_RETENTION_POLICY` +- `SINK_INFLUX_BATCH_SIZE` +- `SINK_INFLUX_FLUSH_DURATION_MS` + +These are used to create a single database entry named `"default"`. Existing deployments require no configuration changes. diff --git a/docs/docs/guides/overview.md b/docs/docs/guides/overview.md index 710890e09..3fb97a913 100644 --- a/docs/docs/guides/overview.md +++ b/docs/docs/guides/overview.md @@ -16,6 +16,10 @@ different available sources. Dagger currently supports InfluxDB, Kafka and BigQuery as supported sinks. This section explains how you can create a dagger and configure different settings. +### [Multiple InfluxDB Databases](./multi_influx_databases.md) + +Route different measurements to different InfluxDB databases from a single Dagger job. Useful for separating high-throughput and low-throughput metrics. + ### [Deploying Dagger](./deployment.md) Dagger runs inside a Flink cluster which can be set up in some distributed resource managers like YARN, VMs or containers in a fully managed runtime environment like Kubernetes. This section contains guides, best practices and advice related to deploying Dagger in production. diff --git a/docs/sidebars.js b/docs/sidebars.js index de03d1e93..02dbb0785 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -10,6 +10,7 @@ module.exports = { "guides/quickstart", "guides/choose_source", "guides/create_dagger", + "guides/multi_influx_databases", "guides/query_examples", "guides/use_transformer", "guides/use_udf",