diff --git a/.gitignore b/.gitignore index 0d4365c..5ebed13 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,5 @@ catalog-report.html voltdbroot/ log/ tests/test_apps/*/*.jar +bin/ +voltdb-rabbitmq.jar diff --git a/README.md b/README.md index 37a97dc..c1455e8 100644 --- a/README.md +++ b/README.md @@ -9,28 +9,16 @@ Before compiling, you need to make sure Java and Ant are installed. You also need the VoltDB Enterprise kit. 1. Download the [RabbitMQ Java client](http://www.rabbitmq.com/java-client.html) -and copy the unpacked Jar file into `voltdb-ent-4.3/lib/extension`. +and copy the unpacked Jar file into `/lib/extension`. -1. Set the environment variable VOLTDIST to the kit directory `voltdb-ent-4.3` +1. Set the environment variable VOLTDIST to the kit directory `voltdb-ent-x.y` and run ant. For example, ```bash -VOLTDIST=../voltdb-ent-4.3 ant +VOLTDIST=../voltdb-ent-7.5 ant ``` If the compilation finished successfully, the final Jar file should be copied -over to `voltdb-ent-4.3/lib/extension`. +over to `voltdb-ent-7.5/lib/extension`. You can enable export in your deployment file with target set to `rabbitmq` and start the server. - -Testing -=============== -To run the Junit tests, you need both the _voltdb_ repository and the _pro_ -repository. - -1. Build the Enterprise Edition in `voltdb`. - -1. Run the tests with the following command -```bash -VOLTDIST=../voltdb ant junit -``` diff --git a/src/main/java/org/voltdb/exportclient/RabbitMQExportClient.java b/src/main/java/org/voltdb/exportclient/RabbitMQExportClient.java index 9b6be17..28f22e6 100644 --- a/src/main/java/org/voltdb/exportclient/RabbitMQExportClient.java +++ b/src/main/java/org/voltdb/exportclient/RabbitMQExportClient.java @@ -48,8 +48,11 @@ import au.com.bytecode.opencsv_voltpatches.CSVWriter; import org.voltdb.VoltDB; +import org.voltdb.VoltType; import org.voltdb.common.Constants; import org.voltdb.export.AdvertisedDataSource; +import org.voltdb.types.TimestampType; +import org.voltdb.utils.Encoder; public class RabbitMQExportClient extends ExportClientBase { @@ -148,13 +151,13 @@ class RabbitExportDecoder extends ExportDecoderBase { private Connection m_connection; // Cached RabbitMQ channel private Channel m_channel; - + private final AdvertisedDataSource m_source; private final ListeningExecutorService m_es; RabbitExportDecoder(AdvertisedDataSource source) { super(source); slogger.info("Creating Rabbit export decoder for " + source.toString()); - + m_source = source; m_es = CoreUtils.getListeningSingleThreadExecutor( "RabbitMQ Export decoder for partition " + source.partitionId + " table " + source.tableName @@ -229,6 +232,43 @@ String getEffectiveRoutingKey(ExportRowData row) return effectiveRoutingKey; } + //Based on your skipinternal value return index of first field. + public int getFirstField(boolean skipinternal) { + return skipinternal ? INTERNAL_FIELD_COUNT : 0; + } + + public boolean writeRow(Object row[], CSVWriter writer, boolean skipinternal, + BinaryEncoding binaryEncoding, SimpleDateFormat dateFormatter) { + + int firstfield = getFirstField(skipinternal); + try { + String[] fields = new String[m_source.columnTypes.size() - firstfield]; + for (int i = firstfield; i < m_source.columnTypes.size(); i++) { + if (row[i] == null) { + fields[i - firstfield] = "NULL"; + } else if (m_source.columnTypes.get(i) == VoltType.VARBINARY && binaryEncoding != null) { + if (binaryEncoding == BinaryEncoding.HEX) { + fields[i - firstfield] = Encoder.hexEncode((byte[]) row[i]); + } else { + fields[i - firstfield] = Encoder.base64Encode((byte[]) row[i]); + } + } else if (m_source.columnTypes.get(i) == VoltType.STRING) { + fields[i - firstfield] = (String) row[i]; + } else if (m_source.columnTypes.get(i) == VoltType.TIMESTAMP && dateFormatter != null) { + TimestampType timestamp = (TimestampType) row[i]; + fields[i - firstfield] = dateFormatter.format(timestamp.asApproximateJavaDate()); + } else { + fields[i - firstfield] = row[i].toString(); + } + } + writer.writeNext(fields); + } catch (Exception x) { + x.printStackTrace(); + return false; + } + return true; + } + @Override public boolean processRow(int rowSize, byte[] rowData) throws RestartBlockException { diff --git a/src/test/java/org/voltdb/exportclient/TestRabbitMQExportClient.java b/src/test/java/org/voltdb/exportclient/TestRabbitMQExportClient.java index 68b3410..8000848 100644 --- a/src/test/java/org/voltdb/exportclient/TestRabbitMQExportClient.java +++ b/src/test/java/org/voltdb/exportclient/TestRabbitMQExportClient.java @@ -23,20 +23,32 @@ package org.voltdb.exportclient; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.MessageProperties; -import org.junit.Before; -import org.junit.Test; -import org.voltdb.VoltDB; -import org.voltdb.export.AdvertisedDataSource; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.math.BigDecimal; import java.util.Properties; import java.util.TimeZone; -import static org.junit.Assert.*; +import org.voltdb.VoltDB; +import org.voltdb.export.AdvertisedDataSource; +import org.voltdb.exportclient.ExportDecoderBase.RestartBlockException; +import org.voltdb.types.GeographyPointValue; +import org.voltdb.types.GeographyValue; + +import org.junit.Before; +import org.junit.Test; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.MessageProperties; public class TestRabbitMQExportClient extends ExportClientTestBase { + static final GeographyPointValue GEOG_POINT = GeographyPointValue.fromWKT("point(-122 37)"); + static final GeographyValue GEOG = GeographyValue.fromWKT("polygon((0 0, 1 1, 0 1, 0 0))"); + + @Override @Before public void setup() { @@ -117,9 +129,9 @@ public void testPartitionColumnEffectiveRoutingKey() throws Exception /* expectedRoutingKey */ "yankeelover.4"); // Replicated table, no routing suffix by default - verifyRoutingKey(/* replicated */ true, - /* colName */ null, - /* expectedRoutingKey */ "yankeelover"); +// verifyRoutingKey(/* replicated */ true, +// /* colName */ null, +// /* expectedRoutingKey */ "yankeelover"); // Replicated table, specify routing suffix verifyRoutingKey(/* replicated */ true, /* colName */ "yankeelover.integer", @@ -134,8 +146,7 @@ public void testPartitionColumnEffectiveRoutingKey() throws Exception private void verifyRoutingKey(boolean replicated, String colName, String expectedRoutingKey) - throws Exception - { + throws Exception { final RabbitMQExportClient dut = new RabbitMQExportClient(); final Properties config = new Properties(); config.setProperty("broker.host", "localhost"); @@ -149,12 +160,25 @@ private void verifyRoutingKey(boolean replicated, (RabbitMQExportClient.RabbitExportDecoder) dut.constructExportDecoder(source); long l = System.currentTimeMillis(); + int partitionColumn; + if (replicated) { + partitionColumn = 3; + } + else { + partitionColumn = 7; + } + vtable.addRow(l, l, l, 0, l, l, (byte) 1, /* partitioning column */ (short) 2, - 3, 4, 5.5, 6, "xx", new BigDecimal(88)); + 3, 4, 5.5, 6, "xx", new BigDecimal(88), GEOG_POINT, GEOG); vtable.advanceRow(); - byte[] rowBytes = ExportEncoder.encodeRow(vtable); - final ExportDecoderBase.ExportRowData rowData = decoder.decodeRow(rowBytes); + byte[] rowBytes = ExportEncoder.encodeRow(vtable, "yankeelover", partitionColumn, 0L); + final ExportRowData rowData = ExportRowData.decodeRow(partitionColumn, l, rowBytes); + decoder.onBlockStart(rowData); + try { + decoder.onBlockCompletion(rowData); + } + catch (RestartBlockException ignore) {} assertEquals(expectedRoutingKey, decoder.getEffectiveRoutingKey(rowData)); } }