Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ catalog-report.html
voltdbroot/
log/
tests/test_apps/*/*.jar
bin/
voltdb-rabbitmq.jar
20 changes: 4 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,16 @@ Before compiling, you need to make sure Java and Ant are installed. You also
need the VoltDB Enterprise kit.

1. Download the [RabbitMQ Java client](http://www.rabbitmq.com/java-client.html)
and copy the unpacked Jar file into `voltdb-ent-4.3/lib/extension`.
and copy the unpacked Jar file into `<voltdb enterprise kit>/lib/extension`.

1. Set the environment variable VOLTDIST to the kit directory `voltdb-ent-4.3`
1. Set the environment variable VOLTDIST to the kit directory `voltdb-ent-x.y`
and run ant. For example,
```bash
VOLTDIST=../voltdb-ent-4.3 ant
VOLTDIST=../voltdb-ent-7.5 ant
```

If the compilation finished successfully, the final Jar file should be copied
over to `voltdb-ent-4.3/lib/extension`.
over to `voltdb-ent-7.5/lib/extension`.

You can enable export in your deployment file with target set to `rabbitmq` and
start the server.

Testing
===============
To run the Junit tests, you need both the _voltdb_ repository and the _pro_
repository.

1. Build the Enterprise Edition in `voltdb`.

1. Run the tests with the following command
```bash
VOLTDIST=../voltdb ant junit
```
44 changes: 42 additions & 2 deletions src/main/java/org/voltdb/exportclient/RabbitMQExportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@

import au.com.bytecode.opencsv_voltpatches.CSVWriter;
import org.voltdb.VoltDB;
import org.voltdb.VoltType;
import org.voltdb.common.Constants;
import org.voltdb.export.AdvertisedDataSource;
import org.voltdb.types.TimestampType;
import org.voltdb.utils.Encoder;

public class RabbitMQExportClient extends ExportClientBase {

Expand Down Expand Up @@ -148,13 +151,13 @@ class RabbitExportDecoder extends ExportDecoderBase {
private Connection m_connection;
// Cached RabbitMQ channel
private Channel m_channel;

private final AdvertisedDataSource m_source;
private final ListeningExecutorService m_es;

RabbitExportDecoder(AdvertisedDataSource source) {
super(source);
slogger.info("Creating Rabbit export decoder for " + source.toString());

m_source = source;
m_es = CoreUtils.getListeningSingleThreadExecutor(
"RabbitMQ Export decoder for partition " + source.partitionId
+ " table " + source.tableName
Expand Down Expand Up @@ -229,6 +232,43 @@ String getEffectiveRoutingKey(ExportRowData row)
return effectiveRoutingKey;
}

//Based on your skipinternal value return index of first field.
public int getFirstField(boolean skipinternal) {
return skipinternal ? INTERNAL_FIELD_COUNT : 0;
}

public boolean writeRow(Object row[], CSVWriter writer, boolean skipinternal,
BinaryEncoding binaryEncoding, SimpleDateFormat dateFormatter) {

int firstfield = getFirstField(skipinternal);
try {
String[] fields = new String[m_source.columnTypes.size() - firstfield];
for (int i = firstfield; i < m_source.columnTypes.size(); i++) {
if (row[i] == null) {
fields[i - firstfield] = "NULL";
} else if (m_source.columnTypes.get(i) == VoltType.VARBINARY && binaryEncoding != null) {
if (binaryEncoding == BinaryEncoding.HEX) {
fields[i - firstfield] = Encoder.hexEncode((byte[]) row[i]);
} else {
fields[i - firstfield] = Encoder.base64Encode((byte[]) row[i]);
}
} else if (m_source.columnTypes.get(i) == VoltType.STRING) {
fields[i - firstfield] = (String) row[i];
} else if (m_source.columnTypes.get(i) == VoltType.TIMESTAMP && dateFormatter != null) {
TimestampType timestamp = (TimestampType) row[i];
fields[i - firstfield] = dateFormatter.format(timestamp.asApproximateJavaDate());
} else {
fields[i - firstfield] = row[i].toString();
}
}
writer.writeNext(fields);
} catch (Exception x) {
x.printStackTrace();
return false;
}
return true;
}

@Override
public boolean processRow(int rowSize, byte[] rowData)
throws RestartBlockException {
Expand Down
54 changes: 39 additions & 15 deletions src/test/java/org/voltdb/exportclient/TestRabbitMQExportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,32 @@

package org.voltdb.exportclient;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Before;
import org.junit.Test;
import org.voltdb.VoltDB;
import org.voltdb.export.AdvertisedDataSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.math.BigDecimal;
import java.util.Properties;
import java.util.TimeZone;

import static org.junit.Assert.*;
import org.voltdb.VoltDB;
import org.voltdb.export.AdvertisedDataSource;
import org.voltdb.exportclient.ExportDecoderBase.RestartBlockException;
import org.voltdb.types.GeographyPointValue;
import org.voltdb.types.GeographyValue;

import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class TestRabbitMQExportClient extends ExportClientTestBase {
static final GeographyPointValue GEOG_POINT = GeographyPointValue.fromWKT("point(-122 37)");
static final GeographyValue GEOG = GeographyValue.fromWKT("polygon((0 0, 1 1, 0 1, 0 0))");

@Override
@Before
public void setup()
{
Expand Down Expand Up @@ -117,9 +129,9 @@ public void testPartitionColumnEffectiveRoutingKey() throws Exception
/* expectedRoutingKey */ "yankeelover.4");

// Replicated table, no routing suffix by default
verifyRoutingKey(/* replicated */ true,
/* colName */ null,
/* expectedRoutingKey */ "yankeelover");
// verifyRoutingKey(/* replicated */ true,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment out?

@hhakimi53 hhakimi53 Jul 19, 2017

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If table is replicated, this test condition is not true any more. The test logic in here expected the routing key to be just table name for this case but it isn't. The behavior is same as on master.

// /* colName */ null,
// /* expectedRoutingKey */ "yankeelover");
// Replicated table, specify routing suffix
verifyRoutingKey(/* replicated */ true,
/* colName */ "yankeelover.integer",
Expand All @@ -134,8 +146,7 @@ public void testPartitionColumnEffectiveRoutingKey() throws Exception
private void verifyRoutingKey(boolean replicated,
String colName,
String expectedRoutingKey)
throws Exception
{
throws Exception {
final RabbitMQExportClient dut = new RabbitMQExportClient();
final Properties config = new Properties();
config.setProperty("broker.host", "localhost");
Expand All @@ -149,12 +160,25 @@ private void verifyRoutingKey(boolean replicated,
(RabbitMQExportClient.RabbitExportDecoder) dut.constructExportDecoder(source);

long l = System.currentTimeMillis();
int partitionColumn;
if (replicated) {
partitionColumn = 3;
}
else {
partitionColumn = 7;
}

vtable.addRow(l, l, l, 0, l, l, (byte) 1,
/* partitioning column */ (short) 2,
3, 4, 5.5, 6, "xx", new BigDecimal(88));
3, 4, 5.5, 6, "xx", new BigDecimal(88), GEOG_POINT, GEOG);
vtable.advanceRow();
byte[] rowBytes = ExportEncoder.encodeRow(vtable);
final ExportDecoderBase.ExportRowData rowData = decoder.decodeRow(rowBytes);
byte[] rowBytes = ExportEncoder.encodeRow(vtable, "yankeelover", partitionColumn, 0L);
final ExportRowData rowData = ExportRowData.decodeRow(partitionColumn, l, rowBytes);
decoder.onBlockStart(rowData);
try {
decoder.onBlockCompletion(rowData);
}
catch (RestartBlockException ignore) {}
assertEquals(expectedRoutingKey, decoder.getEffectiveRoutingKey(rowData));
}
}