diff --git a/pom.xml b/pom.xml index 20315d62c836c..a321d331717c7 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 pom presto-root @@ -28,9 +28,9 @@ - scm:git:git://github.com/facebook/presto.git - https://github.com/facebook/presto - 0.161 + scm:git:git://github.com/twitter-forks/presto.git + https://github.com/twitter-forks/presto + 0.157-tw-0.29 @@ -91,6 +91,7 @@ presto-hive-cdh5 presto-teradata-functions presto-example-http + twitter-eventlistener-plugin presto-local-file presto-tpch presto-raptor @@ -643,6 +644,12 @@ 3.2 + + org.apache.commons + commons-pool2 + 2.4.2 + + commons-codec commons-codec diff --git a/presto-accumulo/pom.xml b/presto-accumulo/pom.xml index 2544cc5f15253..25b2dfa6873b7 100644 --- a/presto-accumulo/pom.xml +++ b/presto-accumulo/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-accumulo diff --git a/presto-array/pom.xml b/presto-array/pom.xml index 2282780a91288..9464d398187b6 100644 --- a/presto-array/pom.xml +++ b/presto-array/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-array diff --git a/presto-atop/pom.xml b/presto-atop/pom.xml index fad7d54f0588c..351ea06884109 100644 --- a/presto-atop/pom.xml +++ b/presto-atop/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-atop diff --git a/presto-base-jdbc/pom.xml b/presto-base-jdbc/pom.xml index fc8e320103e40..d13a25aad0bd0 100644 --- a/presto-base-jdbc/pom.xml +++ b/presto-base-jdbc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-base-jdbc diff --git a/presto-benchmark-driver/pom.xml b/presto-benchmark-driver/pom.xml index 14777b2334bc3..45160e8845c67 100644 --- a/presto-benchmark-driver/pom.xml +++ b/presto-benchmark-driver/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-benchmark-driver diff --git a/presto-benchmark/pom.xml b/presto-benchmark/pom.xml index e8fc188943887..4ca2d25d55525 100644 --- a/presto-benchmark/pom.xml +++ b/presto-benchmark/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.161 + 0.157-tw-0.29 presto-benchmark diff --git a/presto-blackhole/pom.xml b/presto-blackhole/pom.xml index 5fd7c56c789dd..4660227dace90 100644 --- a/presto-blackhole/pom.xml +++ b/presto-blackhole/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-blackhole diff --git a/presto-bytecode/pom.xml b/presto-bytecode/pom.xml index d997e07f4444a..92241b60b7801 100644 --- a/presto-bytecode/pom.xml +++ b/presto-bytecode/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-bytecode diff --git a/presto-cassandra/pom.xml b/presto-cassandra/pom.xml index 8f38bb6e8cc2f..70ce2e31fb2c1 100644 --- a/presto-cassandra/pom.xml +++ b/presto-cassandra/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-cassandra diff --git a/presto-cli/pom.xml b/presto-cli/pom.xml index 82b5743406618..784141533e83a 100644 --- a/presto-cli/pom.xml +++ b/presto-cli/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-cli diff --git a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java index d5153a1dcde7b..1382d61cb6d5f 100644 --- a/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java +++ b/presto-cli/src/main/java/com/facebook/presto/cli/ClientOptions.java @@ -17,6 +17,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; +import com.sun.security.auth.module.UnixSystem; import io.airlift.airline.Option; import io.airlift.http.client.spnego.KerberosConfig; import io.airlift.units.Duration; @@ -72,15 +73,16 @@ public class ClientOptions @Option(name = "--keystore-password", title = "keystore password", description = "Keystore password") public String keystorePassword; + // Pick the user name for the logged in user. + // Do not let it be overridden by users. + public String user = new UnixSystem().getUsername(); + @Option(name = "--truststore-path", title = "truststore path", description = "Truststore path") public String truststorePath; @Option(name = "--truststore-password", title = "truststore password", description = "Truststore password") public String truststorePassword; - @Option(name = "--user", title = "user", description = "Username") - public String user = System.getProperty("user.name"); - @Option(name = "--source", title = "source", description = "Name of source making query") public String source = "presto-cli"; diff --git a/presto-client/pom.xml b/presto-client/pom.xml index 04e58e8133d5a..96d2efbf15f18 100644 --- a/presto-client/pom.xml +++ b/presto-client/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-client diff --git a/presto-docs/pom.xml b/presto-docs/pom.xml index d431d12650284..39d843f4c1364 100644 --- a/presto-docs/pom.xml +++ b/presto-docs/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-docs diff --git a/presto-example-http/pom.xml b/presto-example-http/pom.xml index 710a09b4c3f1f..9d91549fea3fa 100644 --- a/presto-example-http/pom.xml +++ b/presto-example-http/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-example-http diff --git a/presto-hive-cdh4/pom.xml b/presto-hive-cdh4/pom.xml index 001eb372d4a1e..2daad579d3305 100644 --- a/presto-hive-cdh4/pom.xml +++ b/presto-hive-cdh4/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-hive-cdh4 diff --git a/presto-hive-cdh5/pom.xml b/presto-hive-cdh5/pom.xml index f0fc34b1e0844..d911b3d729907 100644 --- a/presto-hive-cdh5/pom.xml +++ b/presto-hive-cdh5/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-hive-cdh5 diff --git a/presto-hive-hadoop1/pom.xml b/presto-hive-hadoop1/pom.xml index 2244deeae1114..17f770566608b 100644 --- a/presto-hive-hadoop1/pom.xml +++ b/presto-hive-hadoop1/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-hive-hadoop1 diff --git a/presto-hive-hadoop2/pom.xml b/presto-hive-hadoop2/pom.xml index ffbda20228fa3..c0fe52ce0229d 100644 --- a/presto-hive-hadoop2/pom.xml +++ b/presto-hive-hadoop2/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-hive-hadoop2 diff --git a/presto-hive/pom.xml b/presto-hive/pom.xml index 30ba06533db92..fb87edef31567 100644 --- a/presto-hive/pom.xml +++ b/presto-hive/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-hive @@ -53,11 +53,54 @@ hive-apache + + org.apache.curator + curator-recipes + 2.8.0 + + + + org.apache.curator + curator-framework + 2.8.0 + + + + org.apache.curator + curator-client + 2.8.0 + + + + org.apache.curator + curator-test + 2.8.0 + test + + + + org.apache.zookeeper + zookeeper + 3.4.6 + test + + + + com.101tec + zkclient + test + + org.apache.thrift libthrift + + org.apache.commons + commons-pool2 + + io.airlift aircompressor @@ -93,6 +136,12 @@ configuration + + com.googlecode.json-simple + json-simple + 1.1 + + com.google.guava guava diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java index 0e831d72290c2..b4aec1598d025 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java @@ -24,6 +24,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.StandardErrorCode; import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.twitter.hive.util.UgiUtils; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; import io.airlift.units.DataSize; @@ -40,11 +41,13 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.security.UserGroupInformation; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Deque; import java.util.List; @@ -151,8 +154,15 @@ public BackgroundHiveSplitLoader( public void start(HiveSplitSource splitSource) { this.hiveSplitSource = splitSource; + + UserGroupInformation ugi = null; + + if (HiveSessionProperties.getReadAsQueryUser(session)) { + ugi = UgiUtils.getUgi(session.getUser()); + } + for (int i = 0; i < maxPartitionBatchSize; i++) { - ResumableTasks.submit(executor, new HiveSplitLoaderTask()); + ResumableTasks.submit(executor, new HiveSplitLoaderTask(ugi)); } } @@ -165,8 +175,30 @@ public void stop() private class HiveSplitLoaderTask implements ResumableTask { + private UserGroupInformation ugi; + + public HiveSplitLoaderTask(UserGroupInformation ugi) + { + this.ugi = ugi; + } + @Override public TaskStatus process() + { + if (ugi != null) { + try { + return ugi.doAs((PrivilegedExceptionAction) this::doProcess); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("Could not runAs " + session.getUser(), e); + } + } + else { + return doProcess(); + } + } + + private TaskStatus doProcess() { while (true) { if (stopped) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java index 6475cd636f1b0..5700d19a98f23 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java @@ -64,6 +64,8 @@ public class HiveClientConfig private boolean allowCorruptWritesForTesting; + private boolean readAsQueryUser = false; + private Duration metastoreCacheTtl = new Duration(1, TimeUnit.HOURS); private Duration metastoreRefreshInterval = new Duration(1, TimeUnit.SECONDS); private long metastoreCacheMaximumSize = 10000; @@ -305,6 +307,19 @@ public HiveClientConfig setAllowCorruptWritesForTesting(boolean allowCorruptWrit return this; } + public boolean getReadAsQueryUser() + { + return readAsQueryUser; + } + + @Config("hive.read-as-query-user") + @ConfigDescription("When querying hive read data as the user submitting the query instead of as the presto daemon user") + public HiveClientConfig setReadAsQueryUser(boolean readAsQueryUser) + { + this.readAsQueryUser = readAsQueryUser; + return this; + } + @NotNull public Duration getMetastoreCacheTtl() { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index f55f679c33aa9..d5bfabc40d90e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -31,6 +31,7 @@ import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.twitter.hive.PooledHiveMetastoreClientFactory; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; @@ -101,8 +102,7 @@ public void configure(Binder binder) newExporter(binder).export(NamenodeStats.class).as(generatedNameOf(NamenodeStats.class)); binder.bind(HiveMetastoreClientFactory.class).in(Scopes.SINGLETON); - binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON); - configBinder(binder).bindConfig(StaticMetastoreConfig.class); + binder.bind(PooledHiveMetastoreClientFactory.class).in(Scopes.SINGLETON); binder.bind(NodeManager.class).toInstance(nodeManager); binder.bind(TypeManager.class).toInstance(typeManager); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java index e1609c517434e..b1e05863d5ab7 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveConnectorFactory.java @@ -30,6 +30,9 @@ import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider; +import com.facebook.presto.twitter.hive.MetastoreStaticClusterModule; +import com.facebook.presto.twitter.hive.MetastoreZkDiscoveryBasedModule; +import com.facebook.presto.twitter.hive.ZookeeperServersetMetastoreConfig; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; @@ -45,6 +48,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import static io.airlift.configuration.ConditionalModule.installModuleIf; import static java.util.Objects.requireNonNull; public class HiveConnectorFactory @@ -83,6 +87,14 @@ public Connector create(String connectorId, Map config, Connecto Bootstrap app = new Bootstrap( new MBeanModule(), new JsonModule(), + installModuleIf( + ZookeeperServersetMetastoreConfig.class, + zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() == null, + new MetastoreStaticClusterModule()), + installModuleIf( + ZookeeperServersetMetastoreConfig.class, + zkMetastoreConfig -> zkMetastoreConfig.getZookeeperServerHostAndPort() != null, + new MetastoreZkDiscoveryBasedModule()), new HiveClientModule( connectorId, metastore, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java index 71caf2cdae55b..dc6d0f46f3ede 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java @@ -18,38 +18,19 @@ import javax.inject.Inject; import java.net.URI; -import java.util.Map; import static java.util.Objects.requireNonNull; public class HiveHdfsConfiguration implements HdfsConfiguration { - private static final Configuration INITIAL_CONFIGURATION; - - static { - Configuration.addDefaultResource("hdfs-default.xml"); - Configuration.addDefaultResource("hdfs-site.xml"); - - // must not be transitively reloaded during the future loading of various Hadoop modules - // all the required default resources must be declared above - INITIAL_CONFIGURATION = new Configuration(false); - Configuration defaultConfiguration = new Configuration(); - for (Map.Entry entry : defaultConfiguration) { - INITIAL_CONFIGURATION.set(entry.getKey(), entry.getValue()); - } - } - @SuppressWarnings("ThreadLocalNotStaticFinal") private final ThreadLocal hadoopConfiguration = new ThreadLocal() { @Override protected Configuration initialValue() { - Configuration config = new Configuration(false); - for (Map.Entry entry : INITIAL_CONFIGURATION) { - config.set(entry.getKey(), entry.getValue()); - } + Configuration config = new Configuration(); updater.updateConfiguration(config); return config; } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java index 93894d304aae9..9b262cab62d67 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HivePageSourceProvider.java @@ -24,14 +24,18 @@ import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.twitter.hive.util.UgiUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.joda.time.DateTimeZone; import javax.inject.Inject; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,6 +81,24 @@ public HivePageSourceProvider( @Override public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List columns) + { + if (HiveSessionProperties.getReadAsQueryUser(session)) { + UserGroupInformation ugi = UgiUtils.getUgi(session.getUser()); + try { + return ugi.doAs((PrivilegedExceptionAction) () -> + doCreatePageSource(session, split, columns) + ); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("Could not runAs " + session.getUser(), e); + } + } + else { + return doCreatePageSource(session, split, columns); + } + } + + private ConnectorPageSource doCreatePageSource(ConnectorSession session, ConnectorSplit split, List columns) { List hiveColumns = columns.stream() .map(HiveColumnHandle::toHiveColumnHandle) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java index 71eb660384a3a..e81f9d7d5eb50 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSessionProperties.java @@ -35,6 +35,7 @@ public final class HiveSessionProperties private static final String ORC_STREAM_BUFFER_SIZE = "orc_stream_buffer_size"; private static final String PARQUET_PREDICATE_PUSHDOWN_ENABLED = "parquet_predicate_pushdown_enabled"; private static final String PARQUET_OPTIMIZED_READER_ENABLED = "parquet_optimized_reader_enabled"; + private static final String READ_AS_QUERY_USER = "read_as_query_user"; private static final String MAX_SPLIT_SIZE = "max_split_size"; private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size"; private static final String RCFILE_OPTIMIZED_READER_ENABLED = "rcfile_optimized_reader_enabled"; @@ -55,6 +56,11 @@ public HiveSessionProperties(HiveClientConfig config) "Only schedule splits on workers colocated with data node", config.isForceLocalScheduling(), false), + booleanSessionProperty( + READ_AS_QUERY_USER, + "Query reads happen as the user submitting the query", + config.getReadAsQueryUser(), + true), booleanSessionProperty( ORC_BLOOM_FILTERS_ENABLED, "ORC: Enable bloom filters for predicate pushdown", @@ -147,6 +153,11 @@ public static boolean isParquetPredicatePushdownEnabled(ConnectorSession session return session.getProperty(PARQUET_PREDICATE_PUSHDOWN_ENABLED, Boolean.class); } + public static boolean getReadAsQueryUser(ConnectorSession session) + { + return session.getProperty(READ_AS_QUERY_USER, Boolean.class); + } + public static DataSize getMaxSplitSize(ConnectorSession session) { return session.getProperty(MAX_SPLIT_SIZE, DataSize.class); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java index 8675bd60c2fa5..fd4e5af169720 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/RetryDriver.java @@ -137,16 +137,18 @@ public V run(String callableName, Callable callable) return callable.call(); } catch (Exception e) { + log.debug("Failed on executing %s with attempt %d, Exception: %s", callableName, attempt, e.getMessage()); e = exceptionMapper.apply(e); for (Class clazz : exceptionWhiteList) { if (clazz.isInstance(e)) { + log.debug("Exception is in whitelist."); throw e; } } if (attempt >= maxAttempts || Duration.nanosSince(startTime).compareTo(maxRetryTime) >= 0) { + log.debug("Maximum attempts or maximum retry time reached. attempt: %d, maxAttempts: %d, duration: [%s] maxRetryTime: [%s]", attempt, maxAttempts, Duration.nanosSince(startTime).toString(), maxRetryTime.toString()); throw e; } - log.debug("Failed on executing %s with attempt %d, will retry. Exception: %s", callableName, attempt, e.getMessage()); int delayInMs = (int) Math.min(minSleepTime.toMillis() * Math.pow(scaleFactor, attempt - 1), maxSleepTime.toMillis()); try { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java index 193cb20884864..9daf58dd0957d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetTypeUtils.java @@ -85,7 +85,7 @@ public static int getFieldIndex(MessageType fileSchema, String name) public static parquet.schema.Type getParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames) { if (useParquetColumnNames) { - return getParquetTypeByName(column.getName(), messageType); + return findParquetTypeByName(column, messageType); } if (column.getHiveColumnIndex() < messageType.getFieldCount()) { @@ -94,6 +94,28 @@ public static parquet.schema.Type getParquetType(HiveColumnHandle column, Messag return null; } + /** + * Find the column type by name using returning the first match with the following logic: + *
    + *
  • direct match
  • + *
  • case-insensitive match
  • + *
  • if the name ends with _, remove it and direct match
  • + *
  • if the name ends with _, remove it and case-insensitive match
  • + *
+ */ + private static parquet.schema.Type findParquetTypeByName(HiveColumnHandle column, MessageType messageType) + { + String name = column.getName(); + Type type = getParquetTypeByName(name, messageType); + + // when a parquet field is a hive keyword we append an _ to it in hive. When doing + // a name-based lookup, we need to strip it off again if we didn't get a direct match. + if (type == null && name.endsWith("_")) { + type = getParquetTypeByName(name.substring(0, name.length() - 1), messageType); + } + return type; + } + public static ParquetEncoding getParquetEncoding(Encoding encoding) { switch (encoding) { diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java index 9921bf7e54fd7..f65425c6924bc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/util/HiveFileIterator.java @@ -35,7 +35,6 @@ import java.util.Properties; import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; -import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILE_NOT_FOUND; import static java.util.Objects.requireNonNull; public class HiveFileIterator @@ -99,7 +98,8 @@ protected LocatedFileStatus computeNext() return endOfData(); } catch (FileNotFoundException e) { - throw new PrestoException(HIVE_FILE_NOT_FOUND, "Partition location does not exist: " + path); + // We are okay if the path does not exist. + return endOfData(); } catch (IOException e) { throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed to list directory: " + path, e); diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java new file mode 100644 index 0000000000000..b8c1b7dc9a60d --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreStaticClusterModule.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.facebook.presto.hive.StaticHiveCluster; +import com.facebook.presto.hive.StaticMetastoreConfig; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class MetastoreStaticClusterModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(HiveCluster.class).to(StaticHiveCluster.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(StaticMetastoreConfig.class); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java new file mode 100644 index 0000000000000..775a5afaf4c81 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/MetastoreZkDiscoveryBasedModule.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class MetastoreZkDiscoveryBasedModule + implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(HiveCluster.class).to(ZookeeperServersetHiveCluster.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(ZookeeperServersetMetastoreConfig.class); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java new file mode 100644 index 0000000000000..483a736759b0f --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/PooledHiveMetastoreClientFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.ThriftHiveMetastoreClient; +import com.facebook.presto.hive.authentication.HiveMetastoreAuthentication; +import com.facebook.presto.hive.metastore.HiveMetastoreClient; +import com.facebook.presto.twitter.hive.util.PooledTTransportFactory; +import com.facebook.presto.twitter.hive.util.TTransportPool; +import com.google.common.net.HostAndPort; +import com.google.common.primitives.Ints; +import io.airlift.units.Duration; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import javax.annotation.Nullable; +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class PooledHiveMetastoreClientFactory +{ + private final HostAndPort socksProxy; + private final int timeoutMillis; + private final HiveMetastoreAuthentication metastoreAuthentication; + private final TTransportPool transportPool; + + public PooledHiveMetastoreClientFactory(@Nullable HostAndPort socksProxy, Duration timeout, + HiveMetastoreAuthentication metastoreAuthentication, + int maxTransport, long idleTimeout, long transportEvictInterval, int evictNumTests) + { + this.socksProxy = socksProxy; + this.timeoutMillis = Ints.checkedCast(timeout.toMillis()); + this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null"); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxIdle(maxTransport); + poolConfig.setMaxTotal(maxTransport); + poolConfig.setMinEvictableIdleTimeMillis(idleTimeout); + poolConfig.setTimeBetweenEvictionRunsMillis(transportEvictInterval); + poolConfig.setNumTestsPerEvictionRun(evictNumTests); + this.transportPool = new TTransportPool(poolConfig); + } + + @Inject + public PooledHiveMetastoreClientFactory(HiveClientConfig config, + ZookeeperServersetMetastoreConfig zkConfig, + HiveMetastoreAuthentication metastoreAuthentication) + { + this(config.getMetastoreSocksProxy(), + config.getMetastoreTimeout(), + metastoreAuthentication, + zkConfig.getMaxTransport(), + zkConfig.getTransportIdleTimeout(), + zkConfig.getTransportEvictInterval(), + zkConfig.getTransportEvictNumTests()); + } + + public HiveMetastoreClient create(String host, int port) + throws TTransportException + { + try { + TTransport transport = transportPool.borrowObject(host, port); + if (transport == null) { + transport = transportPool.borrowObject(host, port, + new PooledTTransportFactory(transportPool, + host, port, socksProxy, + timeoutMillis, metastoreAuthentication)); + } + return new ThriftHiveMetastoreClient(transport); + } + catch (Exception e) { + throw new TTransportException(String.format("%s: %s", host, e.getMessage()), e.getCause()); + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java new file mode 100644 index 0000000000000..e1d0f2011468d --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperMetastoreMonitor.java @@ -0,0 +1,119 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.google.common.net.HostAndPort; + +import io.airlift.log.Logger; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public class ZookeeperMetastoreMonitor implements PathChildrenCacheListener +{ + public static final Logger log = Logger.get(ZookeeperMetastoreMonitor.class); + private CuratorFramework client; + private PathChildrenCache cache; + private ConcurrentMap servers; // (Node_Name->HostAndPort) + + public ZookeeperMetastoreMonitor(String zkServer, String watchPath, int maxRetries, int retrySleepTime) + throws Exception + { + client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(retrySleepTime, maxRetries)); + client.start(); + + cache = new PathChildrenCache(client, watchPath, true); // true indicating cache node contents in addition to the stat + try { + cache.start(); + } + catch (Exception ex) { + throw new RuntimeException("Curator PathCache Creation failed: " + ex.getMessage()); + } + + cache.getListenable().addListener(this); + servers = new ConcurrentHashMap<>(); + } + + public void close() + { + client.close(); + + try { + cache.close(); + } + catch (IOException ex) { + // do nothing + } + } + + public List getServers() + { + return servers.values().stream().collect(Collectors.toList()); + } + + private HostAndPort deserialize(byte[] bytes) + { + String serviceEndpoint = "serviceEndpoint"; + JSONObject data = (JSONObject) JSONValue.parse(new String(bytes)); + if (data != null && data.containsKey(serviceEndpoint)) { + Map hostPortMap = (Map) data.get(serviceEndpoint); + String host = hostPortMap.get("host").toString(); + int port = Integer.parseInt(hostPortMap.get("port").toString()); + return HostAndPort.fromParts(host, port); + } + else { + log.warn("failed to deserialize child node data"); + throw new IllegalArgumentException("No host:port found"); + } + } + + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: { + HostAndPort hostPort = deserialize(event.getData().getData()); + String node = ZKPaths.getNodeFromPath(event.getData().getPath()); + log.info("child updated: " + node + ": " + hostPort); + servers.put(node, hostPort); + break; + } + + case CHILD_REMOVED: { + String node = ZKPaths.getNodeFromPath(event.getData().getPath()); + log.info("child removed: " + node); + servers.remove(node); + break; + } + + default: + log.info("connection state changed: " + event.getType()); + break; + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java new file mode 100644 index 0000000000000..c4f803311b80b --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetHiveCluster.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.hive.HiveCluster; +import com.facebook.presto.hive.metastore.HiveMetastoreClient; +import com.google.common.net.HostAndPort; +import io.airlift.log.Logger; +import org.apache.thrift.transport.TTransportException; + +import javax.inject.Inject; + +import java.util.Collections; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class ZookeeperServersetHiveCluster + implements HiveCluster +{ + private static final Logger log = Logger.get(ZookeeperServersetHiveCluster.class); + private final PooledHiveMetastoreClientFactory clientFactory; + private ZookeeperMetastoreMonitor zkMetastoreMonitor; + + @Inject + public ZookeeperServersetHiveCluster(ZookeeperServersetMetastoreConfig config, PooledHiveMetastoreClientFactory clientFactory) + throws Exception + { + String zkServerHostAndPort = requireNonNull(config.getZookeeperServerHostAndPort(), "zkServerHostAndPort is null"); + String zkMetastorePath = requireNonNull(config.getZookeeperMetastorePath(), "zkMetastorePath is null"); + int zkRetries = requireNonNull(config.getZookeeperMaxRetries(), "zkMaxRetried is null"); + int zkRetrySleepTime = requireNonNull(config.getZookeeperRetrySleepTime(), "zkRetrySleepTime is null"); + this.clientFactory = requireNonNull(clientFactory, "clientFactory is null"); + this.zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServerHostAndPort, zkMetastorePath, zkRetries, zkRetrySleepTime); + } + + @Override + public HiveMetastoreClient createMetastoreClient() + { + List metastores = zkMetastoreMonitor.getServers(); + Collections.shuffle(metastores); + TTransportException lastException = null; + for (HostAndPort metastore : metastores) { + try { + log.info("Connecting to metastore at: %s", metastore.toString()); + return clientFactory.create(metastore.getHostText(), metastore.getPort()); + } + catch (TTransportException e) { + log.debug("Failed connecting to Hive metastore at: %s", metastore.toString()); + lastException = e; + } + } + + throw new RuntimeException("Failed connecting to Hive metastore.", lastException); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java new file mode 100644 index 0000000000000..65b424b6c437e --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/ZookeeperServersetMetastoreConfig.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +public class ZookeeperServersetMetastoreConfig +{ + private String zookeeperServerHostAndPort; + private String zookeeperMetastorePath; + private int zookeeperRetrySleepTime = 500; // ms + private int zookeeperMaxRetries = 3; + private int maxTransport = 128; + private long transportIdleTimeout = 300_000L; + private long transportEvictInterval = 10_000L; + private int transportEvictNumTests = 3; + + public String getZookeeperServerHostAndPort() + { + return zookeeperServerHostAndPort; + } + + @Config("hive.metastore.zookeeper.uri") + @ConfigDescription("Zookeeper Host and Port") + public ZookeeperServersetMetastoreConfig setZookeeperServerHostAndPort(String zookeeperServerHostAndPort) + { + this.zookeeperServerHostAndPort = zookeeperServerHostAndPort; + return this; + } + + public String getZookeeperMetastorePath() + { + return zookeeperMetastorePath; + } + + @Config("hive.metastore.zookeeper.path") + @ConfigDescription("Hive metastore Zookeeper path") + public ZookeeperServersetMetastoreConfig setZookeeperMetastorePath(String zkPath) + { + this.zookeeperMetastorePath = zkPath; + return this; + } + + @NotNull + public int getZookeeperRetrySleepTime() + { + return zookeeperRetrySleepTime; + } + + @Config("hive.metastore.zookeeper.retry.sleeptime") + @ConfigDescription("Zookeeper sleep time between reties") + public ZookeeperServersetMetastoreConfig setZookeeperRetrySleepTime(int zookeeperRetrySleepTime) + { + this.zookeeperRetrySleepTime = zookeeperRetrySleepTime; + return this; + } + + @Min(1) + public int getZookeeperMaxRetries() + { + return zookeeperMaxRetries; + } + + @Config("hive.metastore.zookeeper.max.retries") + @ConfigDescription("Zookeeper max reties") + public ZookeeperServersetMetastoreConfig setZookeeperMaxRetries(int zookeeperMaxRetries) + { + this.zookeeperMaxRetries = zookeeperMaxRetries; + return this; + } + + @Min(1) + public int getMaxTransport() + { + return maxTransport; + } + + @Config("hive.metastore.max-transport-num") + public ZookeeperServersetMetastoreConfig setMaxTransport(int maxTransport) + { + this.maxTransport = maxTransport; + return this; + } + + public long getTransportIdleTimeout() + { + return transportIdleTimeout; + } + + @Config("hive.metastore.transport-idle-timeout") + public ZookeeperServersetMetastoreConfig setTransportIdleTimeout(long transportIdleTimeout) + { + this.transportIdleTimeout = transportIdleTimeout; + return this; + } + + public long getTransportEvictInterval() + { + return transportEvictInterval; + } + + @Config("hive.metastore.transport-eviction-interval") + public ZookeeperServersetMetastoreConfig setTransportEvictInterval(long transportEvictInterval) + { + this.transportEvictInterval = transportEvictInterval; + return this; + } + + @Min(0) + public int getTransportEvictNumTests() + { + return transportEvictNumTests; + } + + @Config("hive.metastore.transport-eviction-num-tests") + public ZookeeperServersetMetastoreConfig setTransportEvictNumTests(int transportEvictNumTests) + { + this.transportEvictNumTests = transportEvictNumTests; + return this; + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/PooledTTransportFactory.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/PooledTTransportFactory.java new file mode 100644 index 0000000000000..68efa43131cb6 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/PooledTTransportFactory.java @@ -0,0 +1,258 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import com.facebook.presto.hive.authentication.HiveMetastoreAuthentication; +import com.google.common.net.HostAndPort; +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; + +import static java.util.Objects.requireNonNull; + +public class PooledTTransportFactory + extends BasePooledObjectFactory +{ + private final TTransportPool pool; + private final String host; + private final int port; + private final HostAndPort socksProxy; + private final int timeoutMillis; + private final HiveMetastoreAuthentication metastoreAuthentication; + + public PooledTTransportFactory(TTransportPool pool, String host, int port, + @Nullable HostAndPort socksProxy, int timeoutMillis, + HiveMetastoreAuthentication metastoreAuthentication) + { + this.pool = requireNonNull(pool, "pool is null"); + this.host = requireNonNull(host, "host is null"); + this.port = port; + this.socksProxy = socksProxy; + this.timeoutMillis = timeoutMillis; + this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null"); + } + + @Override + public void activateObject(PooledObject pooledObject) + throws Exception + { + pooledObject.getObject().flush(); + } + + @Override + public boolean validateObject(PooledObject pooledObject) + { + try { + return (pooledObject.getObject().isOpen() && + ((PooledTTransport) pooledObject.getObject()).isReachable(timeoutMillis)); + } + catch (Exception e) { + return false; + } + } + + @Override + public TTransport create() + throws Exception + { + TTransport transport; + if (socksProxy == null) { + transport = new TSocket(host, port, timeoutMillis); + } + else { + SocketAddress address = InetSocketAddress.createUnresolved(socksProxy.getHostText(), + socksProxy.getPort()); + Socket socket = new Socket(new Proxy(Proxy.Type.SOCKS, address)); + try { + socket.connect(InetSocketAddress.createUnresolved(host, port), timeoutMillis); + socket.setSoTimeout(timeoutMillis); + transport = new TSocket(socket); + } + catch (SocketException e) { + if (socket.isConnected()) { + try { + socket.close(); + } + catch (IOException ioException) { + // ignored + } + } + throw e; + } + } + TTransport authenticatedTransport = metastoreAuthentication.authenticate(transport, host); + if (!authenticatedTransport.isOpen()) { + authenticatedTransport.open(); + } + + return new PooledTTransport(authenticatedTransport, pool, + HostAndPort.fromParts(host, port).toString()); + } + + @Override + public void destroyObject(PooledObject pooledObject) + { + try { + ((PooledTTransport) pooledObject.getObject()).getTTransport().close(); + } + catch (ClassCastException e) { + // ignore + } + pooledObject.invalidate(); + } + + @Override + public PooledObject wrap(TTransport transport) + { + return new DefaultPooledObject(transport); + } + + @Override + public void passivateObject(PooledObject pooledObject) + { + try { + pooledObject.getObject().flush(); + } + catch (TTransportException e) { + destroyObject(pooledObject); + } + } + + private static class PooledTTransport + extends TTransport + { + private final String remote; + private final TTransportPool pool; + private final TTransport transport; + + public PooledTTransport(TTransport transport, TTransportPool pool, String remote) + { + this.transport = transport; + this.pool = pool; + this.remote = remote; + } + + public TTransport getTTransport() + { + return transport; + } + + public boolean isReachable(int timeoutMillis) + throws ClassCastException, IOException + { + return ((TSocket) transport).getSocket().getInetAddress().isReachable(timeoutMillis); + } + + @Override + public void close() + { + try { + pool.returnObject(remote, this, transport); + } + catch (Exception e) { + transport.close(); + } + } + + @Override + public boolean isOpen() + { + return transport.isOpen(); + } + + @Override + public boolean peek() + { + return transport.peek(); + } + + @Override + public byte[] getBuffer() + { + return transport.getBuffer(); + } + + @Override + public int getBufferPosition() + { + return transport.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() + { + return transport.getBytesRemainingInBuffer(); + } + + @Override + public void consumeBuffer(int len) + { + transport.consumeBuffer(len); + } + + @Override + public void open() + throws TTransportException + { + transport.open(); + } + + @Override + public int readAll(byte[] bytes, int off, int len) + throws TTransportException + { + return transport.readAll(bytes, off, len); + } + + @Override + public int read(byte[] bytes, int off, int len) + throws TTransportException + { + return transport.read(bytes, off, len); + } + + @Override + public void write(byte[] bytes) + throws TTransportException + { + transport.write(bytes); + } + + @Override + public void write(byte[] bytes, int off, int len) + throws TTransportException + { + transport.write(bytes, off, len); + } + + @Override + public void flush() + throws TTransportException + { + transport.flush(); + } + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/TTransportPool.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/TTransportPool.java new file mode 100644 index 0000000000000..0e34f6756f1c8 --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/TTransportPool.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import com.google.common.net.HostAndPort; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObjectFactory; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.thrift.transport.TTransport; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TTransportPool +{ + private final ConcurrentMap> pools = new ConcurrentHashMap(); + private GenericObjectPoolConfig poolConfig; + + public TTransportPool(GenericObjectPoolConfig poolConfig) + { + this.poolConfig = poolConfig; + } + + protected synchronized void add(String remote, PooledObjectFactory transportFactory) + { + pools.putIfAbsent(remote, new GenericObjectPool(transportFactory, poolConfig)); + } + + protected TTransport get(String remote, PooledObjectFactory transportFactory) + throws Exception + { + add(remote, transportFactory); + return get(remote); + } + + protected TTransport get(String remote) + throws Exception + { + ObjectPool pool = pools.get(remote); + if (pool == null) { + return null; + } + return pool.borrowObject(); + } + + public TTransport borrowObject(String host, int port, PooledObjectFactory transportFactory) + throws Exception + { + return get(HostAndPort.fromParts(host, port).toString(), transportFactory); + } + + public TTransport borrowObject(String host, int port) + throws Exception + { + return get(HostAndPort.fromParts(host, port).toString()); + } + + public void returnObject(String remote, TTransport pooledTransport, TTransport transport) + { + if (remote == null) { + transport.close(); + return; + } + ObjectPool pool = pools.get(remote); + if (pool == null) { + transport.close(); + return; + } + try { + pool.returnObject(pooledTransport); + } + catch (Exception e) { + transport.close(); + } + } + + public void returnObject(TTransport transport) + { + transport.close(); + } +} diff --git a/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java new file mode 100644 index 0000000000000..6d540bbe3c5ac --- /dev/null +++ b/presto-hive/src/main/java/com/facebook/presto/twitter/hive/util/UgiUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Utility class to handle creating and caching the UserGroupInformation object. + */ +public class UgiUtils +{ + private UgiUtils() {} + + // Every instance of a UserGroupInformation object for a given user has a unique hashcode, due + // to the hashCode() impl. If we don't cache the UGI per-user here, there will be a memory leak + // in the PrestoFileSystemCache. + private static final Map UGI_CACHE = new ConcurrentHashMap<>(); + + public static UserGroupInformation getUgi(String user) + { + UserGroupInformation ugi = UGI_CACHE.get(user); + + if (ugi == null) { + // Configure hadoop to allow presto daemon user to impersonate all presto users + // See https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/Superusers.html + try { + ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } + catch (IOException e) { + throw new RuntimeException("Could not get login user from UserGroupInformation", e); + } + UGI_CACHE.put(user, ugi); + } + + return ugi; + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java index dbe1d0125e0f7..46047e64fac1f 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java @@ -67,6 +67,7 @@ public void testDefaults() .setHiveStorageFormat(HiveStorageFormat.RCBINARY) .setHiveCompressionCodec(HiveCompressionCodec.GZIP) .setRespectTableFormat(true) + .setReadAsQueryUser(false) .setImmutablePartitions(false) .setMaxPartitionsPerWriter(100) .setUseParquetColumnNames(false) @@ -153,6 +154,7 @@ public void testExplicitPropertyMappings() .put("hive.max-concurrent-file-renames", "100") .put("hive.assume-canonical-partition-keys", "true") .put("hive.parquet.use-column-names", "true") + .put("hive.read-as-query-user", "true") .put("hive.orc.use-column-names", "true") .put("hive.s3.aws-access-key", "abc123") .put("hive.s3.aws-secret-key", "secret") @@ -225,6 +227,7 @@ public void testExplicitPropertyMappings() .setVerifyChecksum(false) .setResourceConfigFiles(ImmutableList.of("/foo.xml", "/bar.xml")) .setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE) + .setReadAsQueryUser(true) .setHiveCompressionCodec(HiveCompressionCodec.NONE) .setRespectTableFormat(false) .setImmutablePartitions(true) diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java new file mode 100644 index 0000000000000..1f89464aedce3 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperMetastoreMonitor.java @@ -0,0 +1,157 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.facebook.presto.twitter.hive.util.TestUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.net.HostAndPort; +import io.airlift.log.Logger; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.curator.test.TestingServer; +import org.json.simple.JSONObject; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertTrue; + +public class TestZookeeperMetastoreMonitor +{ + private static final Logger log = Logger.get(TestZookeeperMetastoreMonitor.class); + + private ZookeeperMetastoreMonitor zkMetastoreMonitor; + private TestingServer zkServer; + private ZkClient zkClient; + private final String zkPath = "/metastores"; + + public TestZookeeperMetastoreMonitor() + throws Exception + { + zkServer = new TestingServer(TestUtils.findUnusedPort()); + zkClient = new ZkClient(zkServer.getConnectString(), 30_000, 30_000); + + // Set the serializer + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError + { + try { + return o.toString().getBytes(StandardCharsets.UTF_8); + } + catch (Exception e) { + log.warn("Exception in serializing " + e); + } + return "".getBytes(); + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError + { + return null; + } + }); + } + + @AfterClass + public void destroy() + throws IOException + { + zkMetastoreMonitor.close(); + zkClient.close(); + zkServer.close(); + } + + @BeforeTest + public void setUp() + throws Exception + { + log.info("Cleaning up zookeeper"); + zkClient.getChildren("/").stream() + .filter(child -> !child.equals("zookeeper")) + .forEach(child -> zkClient.deleteRecursive("/" + child)); + + zkClient.unsubscribeAll(); + + zkClient.createPersistent(zkPath); + zkMetastoreMonitor = new ZookeeperMetastoreMonitor(zkServer.getConnectString(), zkPath, 3, 500); + } + + @Test + public void testGetServers() throws Exception + { + List servers; + List expected; + assertTrue(zkMetastoreMonitor.getServers().isEmpty()); + + addServerToZk("nameNode1", "host1", 10001); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + addServerToZk("nameNode2", "host2", 10002); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001), HostAndPort.fromParts("host2", 10002)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + // Change value of an existing name node + addServerToZk("nameNode2", "host2", 10003); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host1", 10001), HostAndPort.fromParts("host2", 10003)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers)); + + // Delete an existing name node + zkClient.delete(getPathForNameNode("nameNode1")); + // Sleep for some time so that event can be propagated. + TimeUnit.MILLISECONDS.sleep(100); + servers = zkMetastoreMonitor.getServers(); + expected = ImmutableList.of(HostAndPort.fromParts("host2", 10003)); + assertTrue(servers.containsAll(expected) && expected.containsAll(servers), servers.toString()); + } + + private void addServerToZk(String nameNode, String host, int port) + { + JSONObject serviceEndpoint = new JSONObject(); + serviceEndpoint.put("host", host); + serviceEndpoint.put("port", port); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serviceEndpoint", serviceEndpoint); + + String path = getPathForNameNode(nameNode); + + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, jsonObject.toJSONString()); + } + else { + zkClient.writeData(path, jsonObject.toJSONString()); + } + } + + private String getPathForNameNode(String nameNode) + { + return zkPath + "/" + nameNode; + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java new file mode 100644 index 0000000000000..b839a8d154042 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/TestZookeeperServersetMetastoreConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestZookeeperServersetMetastoreConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(ZookeeperServersetMetastoreConfig.class) + .setZookeeperMaxRetries(3) + .setZookeeperRetrySleepTime(500) + .setZookeeperMetastorePath(null) + .setZookeeperServerHostAndPort(null) + .setMaxTransport(128) + .setTransportIdleTimeout(300_000L) + .setTransportEvictInterval(10_000L) + .setTransportEvictNumTests(3)); + } + + @Test + public void testExplicitPropertyMappingsSingleMetastore() + { + Map properties = new ImmutableMap.Builder() + .put("hive.metastore.zookeeper.uri", "localhost:2181") + .put("hive.metastore.zookeeper.path", "/zookeeper/path/") + .put("hive.metastore.zookeeper.retry.sleeptime", "200") + .put("hive.metastore.zookeeper.max.retries", "2") + .put("hive.metastore.max-transport-num", "64") + .put("hive.metastore.transport-idle-timeout", "100000") + .put("hive.metastore.transport-eviction-interval", "1000") + .put("hive.metastore.transport-eviction-num-tests", "10") + .build(); + + ZookeeperServersetMetastoreConfig expected = new ZookeeperServersetMetastoreConfig() + .setZookeeperServerHostAndPort("localhost:2181") + .setZookeeperMetastorePath("/zookeeper/path/") + .setZookeeperRetrySleepTime(200) + .setZookeeperMaxRetries(2) + .setMaxTransport(64) + .setTransportIdleTimeout(100_000L) + .setTransportEvictInterval(1_000L) + .setTransportEvictNumTests(10); + + assertFullMapping(properties, expected); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java new file mode 100644 index 0000000000000..379ad3877e325 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/twitter/hive/util/TestUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.twitter.hive.util; + +import java.io.IOException; +import java.net.ServerSocket; + +public final class TestUtils +{ + private TestUtils() {} + + public static int findUnusedPort() + throws IOException + { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } +} diff --git a/presto-jdbc/pom.xml b/presto-jdbc/pom.xml index e3ff898e63544..d1cb20840cecf 100644 --- a/presto-jdbc/pom.xml +++ b/presto-jdbc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-jdbc diff --git a/presto-jmx/pom.xml b/presto-jmx/pom.xml index d3208026e0864..8514c25365b54 100644 --- a/presto-jmx/pom.xml +++ b/presto-jmx/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-jmx diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml index 69f7007f1706a..8781a46e9ce70 100644 --- a/presto-kafka/pom.xml +++ b/presto-kafka/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-kafka diff --git a/presto-local-file/pom.xml b/presto-local-file/pom.xml index 7cf3ed1802fa3..c0fbc160d1c24 100644 --- a/presto-local-file/pom.xml +++ b/presto-local-file/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-local-file diff --git a/presto-main/pom.xml b/presto-main/pom.xml index 4953177ca6862..97ddd14fac530 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-main diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index 6deb6abf05151..b46f136afe77a 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -130,12 +130,18 @@ public void startPollingNodeStates() Set deadNodes = difference(nodeStates.keySet(), aliveNodeIds).immutableCopy(); nodeStates.keySet().removeAll(deadNodes); + if (deadNodes.size() > 0) { + log.warn("Dead nodes: %s", deadNodes); + } + // Add new nodes for (Node node : aliveNodes) { nodeStates.putIfAbsent(node.getNodeIdentifier(), new RemoteNodeState(httpClient, uriBuilderFrom(node.getHttpUri()).appendPath("/v1/info/state").build())); } + log.debug("Number of alive nodes: %d", nodeStates.size()); + // Schedule refresh nodeStates.values().forEach(RemoteNodeState::asyncRefresh); }, 1, 5, TimeUnit.SECONDS); diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorUIHttpServerModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorUIHttpServerModule.java new file mode 100644 index 0000000000000..688c0f434ef0f --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorUIHttpServerModule.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.facebook.presto.spi.block.BlockEncodingSerde; +import com.facebook.presto.spi.type.TypeManager; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.transaction.TransactionManager; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import com.google.inject.multibindings.Multibinder; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.airlift.discovery.client.AnnouncementHttpServerInfo; +import io.airlift.discovery.server.DynamicAnnouncementResource; +import io.airlift.discovery.server.ServiceResource; +import io.airlift.event.client.EventClient; +import io.airlift.http.server.HttpRequestEvent; +import io.airlift.http.server.HttpServer; +import io.airlift.http.server.HttpServerConfig; +import io.airlift.http.server.HttpServerInfo; +import io.airlift.http.server.HttpServerProvider; +import io.airlift.http.server.LocalAnnouncementHttpServerInfo; +import io.airlift.http.server.RequestStats; +import io.airlift.http.server.TheAdminServlet; +import io.airlift.http.server.TheServlet; +import io.airlift.json.JsonCodecFactory; +import io.airlift.node.NodeConfig; +import io.airlift.node.NodeInfo; + +import javax.servlet.Filter; + +import java.util.UUID; + +import static io.airlift.discovery.client.DiscoveryBinder.discoveryBinder; +import static io.airlift.event.client.EventBinder.eventBinder; +import static io.airlift.http.server.HttpServerBinder.HttpResourceBinding; +import static io.airlift.http.server.HttpServerBinder.httpServerBinder; +import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder; +import static java.util.Objects.requireNonNull; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class CoordinatorUIHttpServerModule + extends AbstractConfigurationAwareModule +{ + private final Injector injector; + + public CoordinatorUIHttpServerModule(Injector injector) + { + this.injector = requireNonNull(injector, "injector is null"); + } + + @Override + protected void setup(Binder binder) + { + binder.disableCircularProxies(); + + ServerConfig serverConfig = injector.getInstance(ServerConfig.class); + HttpServerConfig httpServerConfig = injector.getInstance(HttpServerConfig.class) + .setHttpPort(serverConfig.getUIHttpPort()); + binder.bind(HttpServerConfig.class).toInstance(httpServerConfig); + + binder.bind(HttpServerInfo.class).in(Scopes.SINGLETON); + binder.bind(EventClient.class).toInstance(injector.getInstance(EventClient.class)); + binder.bind(HttpServer.class).toProvider(HttpServerProvider.class).in(Scopes.SINGLETON); + + Multibinder.newSetBinder(binder, Filter.class, TheServlet.class); + Multibinder.newSetBinder(binder, Filter.class, TheAdminServlet.class); + Multibinder.newSetBinder(binder, HttpResourceBinding.class, TheServlet.class); + newExporter(binder).export(HttpServer.class).withGeneratedName(); + binder.bind(RequestStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(RequestStats.class).withGeneratedName(); + eventBinder(binder).bindEventClient(HttpRequestEvent.class); + + NodeConfig nodeConfig = injector.getInstance(NodeConfig.class); + nodeConfig.setNodeId(UUID.randomUUID().toString()); + binder.bind(NodeInfo.class).toInstance(new NodeInfo(nodeConfig)); + binder.bind(AnnouncementHttpServerInfo.class).to(LocalAnnouncementHttpServerInfo.class).in(Scopes.SINGLETON); + binder.bind(BlockEncodingSerde.class).toInstance(injector.getInstance(BlockEncodingSerde.class)); + binder.bind(TypeManager.class).toInstance(injector.getInstance(TypeManager.class)); + binder.bind(SqlParser.class).toInstance(injector.getInstance(SqlParser.class)); + binder.bind(TransactionManager.class).toInstance(injector.getInstance(TransactionManager.class)); + + binder.bind(ObjectMapper.class).toInstance(injector.getInstance(ObjectMapper.class)); + binder.bind(JsonCodecFactory.class).toInstance(injector.getInstance(JsonCodecFactory.class)); + + resourceBinding(binder); + } + + protected void resourceBinding(Binder binder) + { + // bind webapp + httpServerBinder(binder).bindResource("/", "webapp").withWelcomeFile("index.html"); + // presto coordinator ui announcement + discoveryBinder(binder).bindHttpAnnouncement("presto-coordinator-ui"); + // accept presto worker's announcement + jaxrsBinder(binder).bindInstance(injector.getInstance(DynamicAnnouncementResource.class)); + // service info + jaxrsBinder(binder).bindInstance(injector.getInstance(ServiceResource.class)); + // query execution visualizer + jaxrsBinder(binder).bindInstance(injector.getInstance(QueryExecutionResource.class)); + // query manager + jaxrsBinder(binder).bindInstance(injector.getInstance(QueryResource.class)); + // cluster statistics + jaxrsBinder(binder).bindInstance(injector.getInstance(ClusterStatsResource.class)); + // server info resource + jaxrsBinder(binder).bindInstance(injector.getInstance(ServerInfoResource.class)); + // server node resource + jaxrsBinder(binder).bindInstance(injector.getInstance(NodeResource.class)); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index cfa456363ce70..19f1590d29332 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -108,8 +108,6 @@ public void run() new ServerMainModule(sqlParserOptions), new GracefulShutdownModule()); - modules.addAll(getAdditionalModules()); - Bootstrap app = new Bootstrap(modules.build()); try { @@ -132,6 +130,18 @@ public void run() injector.getInstance(Announcer.class).start(); + ServerConfig serverConfig = injector.getInstance(ServerConfig.class); + + if (serverConfig.isCoordinator() && serverConfig.isEnabledUIonSecondaryPort()) { + Bootstrap uIApp = new Bootstrap(new DiscoveryModule(), + new JaxrsModule(true), + new CoordinatorUIHttpServerModule(injector)); + Injector uIInjector = uIApp.doNotInitializeLogging().initialize(); + log.info("UI runs on a seperate port: %d", serverConfig.getUIHttpPort()); + + uIInjector.getInstance(Announcer.class).start(); + } + log.info("======== SERVER STARTED ========"); } catch (Throwable e) { @@ -140,11 +150,6 @@ public void run() } } - protected Iterable getAdditionalModules() - { - return ImmutableList.of(); - } - private static void updateConnectorIds(Announcer announcer, CatalogManager metadata, ServerConfig serverConfig, NodeSchedulerConfig schedulerConfig) { // get existing announcement diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoSystemRequirements.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoSystemRequirements.java index 582066ca167eb..fdf7bc5f10319 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoSystemRequirements.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoSystemRequirements.java @@ -45,7 +45,7 @@ public static void verifyJvmRequirements() failRequirement("Presto requires an Oracle or OpenJDK JVM (found %s)", vendor); } - verifyJavaVersion(); + //verifyJavaVersion(); String dataModel = System.getProperty("sun.arch.data.model"); if (!"64".equals(dataModel)) { diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java b/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java index 8c2d600893fee..f623b43f4ba9e 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java @@ -25,6 +25,8 @@ public class ServerConfig private String dataSources; private boolean includeExceptionInResponse = true; private Duration gracePeriod = new Duration(2, MINUTES); + private boolean enabledUIonSecondaryPort = false; + private int uIHttpPort = 0; public boolean isCoordinator() { @@ -38,6 +40,30 @@ public ServerConfig setCoordinator(boolean coordinator) return this; } + public boolean isEnabledUIonSecondaryPort() + { + return enabledUIonSecondaryPort; + } + + @Config("http-server.ui.secondary.port.enabled") + public ServerConfig setEnabledUIonSecondaryPort(boolean enabledUIonSecondaryPort) + { + this.enabledUIonSecondaryPort = enabledUIonSecondaryPort; + return this; + } + + public int getUIHttpPort() + { + return uIHttpPort; + } + + @Config("http-server.ui.http.port") + public ServerConfig setUIHttpPort(int uIHttpPort) + { + this.uIHttpPort = uIHttpPort; + return this; + } + public String getPrestoVersion() { return prestoVersion; diff --git a/presto-main/src/main/resources/webapp/tableau/presto-client.js b/presto-main/src/main/resources/webapp/tableau/presto-client.js index 3bc357dd1b659..14de90193895b 100644 --- a/presto-main/src/main/resources/webapp/tableau/presto-client.js +++ b/presto-main/src/main/resources/webapp/tableau/presto-client.js @@ -17,6 +17,8 @@ function StatementClient(connectionData, headerCallback, dataCallback, errorCall this.currentResults = null; this.valid = true; + this.isHttps = window.location.protocol === "https:" + if (!(connectionData.sessionParameters === undefined)) { var parameterMap = JSON.parse(connectionData.sessionParameters); for (var name in parameterMap) { @@ -72,7 +74,7 @@ StatementClient.prototype.advance = function(lastRecordNumber) { var statementClient = this; $.ajax({ type: "GET", - url: this.currentResults.nextUri, + url: this.isHttps ? this.currentResults.nextUri.replace(/^http:/, 'https:') : this.currentResults.nextUri, headers: this.headers, dataType: 'json', // FIXME having problems when async: true diff --git a/presto-ml/pom.xml b/presto-ml/pom.xml index 7defae7dcd90a..367ab2fc95d85 100644 --- a/presto-ml/pom.xml +++ b/presto-ml/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-ml diff --git a/presto-mongodb/pom.xml b/presto-mongodb/pom.xml index cd7f0a2f6558c..ff06402a50139 100644 --- a/presto-mongodb/pom.xml +++ b/presto-mongodb/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-mongodb diff --git a/presto-mysql/pom.xml b/presto-mysql/pom.xml index fedfd06bb31e9..8519ca166be65 100644 --- a/presto-mysql/pom.xml +++ b/presto-mysql/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-mysql diff --git a/presto-orc/pom.xml b/presto-orc/pom.xml index eed944e1ea559..a552f39fabbed 100644 --- a/presto-orc/pom.xml +++ b/presto-orc/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-orc diff --git a/presto-parser/pom.xml b/presto-parser/pom.xml index 870d38cc3bc6a..578271f897bc0 100644 --- a/presto-parser/pom.xml +++ b/presto-parser/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-parser diff --git a/presto-plugin-toolkit/pom.xml b/presto-plugin-toolkit/pom.xml index e4c62d424c2eb..b54d6eb8a7b69 100644 --- a/presto-plugin-toolkit/pom.xml +++ b/presto-plugin-toolkit/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-plugin-toolkit diff --git a/presto-postgresql/pom.xml b/presto-postgresql/pom.xml index 26bc7d4946535..646e8ebeb0a99 100644 --- a/presto-postgresql/pom.xml +++ b/presto-postgresql/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-postgresql diff --git a/presto-product-tests/pom.xml b/presto-product-tests/pom.xml index 3ad29be3abec0..0225fb57e04ef 100644 --- a/presto-product-tests/pom.xml +++ b/presto-product-tests/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.161 + 0.157-tw-0.29 presto-product-tests diff --git a/presto-raptor/pom.xml b/presto-raptor/pom.xml index 556815dba8429..5bd2ec84bd73a 100644 --- a/presto-raptor/pom.xml +++ b/presto-raptor/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-raptor diff --git a/presto-rcfile/pom.xml b/presto-rcfile/pom.xml index 3bc78b50778cf..62aba5012cb2b 100644 --- a/presto-rcfile/pom.xml +++ b/presto-rcfile/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-rcfile diff --git a/presto-record-decoder/pom.xml b/presto-record-decoder/pom.xml index 95de42244f7b4..aa4683471ce71 100644 --- a/presto-record-decoder/pom.xml +++ b/presto-record-decoder/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-record-decoder diff --git a/presto-redis/pom.xml b/presto-redis/pom.xml index 45b8de7318c9c..cb0adca83adad 100644 --- a/presto-redis/pom.xml +++ b/presto-redis/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-redis diff --git a/presto-resource-group-managers/pom.xml b/presto-resource-group-managers/pom.xml index 8b2922cbfa0cb..4f975bd1e7e37 100644 --- a/presto-resource-group-managers/pom.xml +++ b/presto-resource-group-managers/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-resource-group-managers diff --git a/presto-server-rpm/pom.xml b/presto-server-rpm/pom.xml index 8e1642fff6bd9..48c5116da66e5 100644 --- a/presto-server-rpm/pom.xml +++ b/presto-server-rpm/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-server-rpm diff --git a/presto-server/pom.xml b/presto-server/pom.xml index c502c46cb3246..1b0743a1c4321 100644 --- a/presto-server/pom.xml +++ b/presto-server/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-server diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index 06405779d8031..584a241f24d7e 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -151,4 +151,10 @@ + + + + + + diff --git a/presto-spi/pom.xml b/presto-spi/pom.xml index 92fe25ab6793b..59eb0ee34a0b1 100644 --- a/presto-spi/pom.xml +++ b/presto-spi/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-spi diff --git a/presto-teradata-functions/pom.xml b/presto-teradata-functions/pom.xml index 0f1a27d58293a..cca9f667d352a 100644 --- a/presto-teradata-functions/pom.xml +++ b/presto-teradata-functions/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-teradata-functions diff --git a/presto-testing-server-launcher/pom.xml b/presto-testing-server-launcher/pom.xml index 34263e6d85b3b..0d5b8b0e0a69a 100644 --- a/presto-testing-server-launcher/pom.xml +++ b/presto-testing-server-launcher/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-testing-server-launcher diff --git a/presto-tests/pom.xml b/presto-tests/pom.xml index 170e4291d73bd..2d8d6e7146694 100644 --- a/presto-tests/pom.xml +++ b/presto-tests/pom.xml @@ -5,7 +5,7 @@ presto-root com.facebook.presto - 0.161 + 0.157-tw-0.29 presto-tests diff --git a/presto-tpch/pom.xml b/presto-tpch/pom.xml index 685fc6942f528..38b5385327896 100644 --- a/presto-tpch/pom.xml +++ b/presto-tpch/pom.xml @@ -4,7 +4,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-tpch diff --git a/presto-verifier/pom.xml b/presto-verifier/pom.xml index 307c175768428..c6733e132dbed 100644 --- a/presto-verifier/pom.xml +++ b/presto-verifier/pom.xml @@ -5,7 +5,7 @@ com.facebook.presto presto-root - 0.161 + 0.157-tw-0.29 presto-verifier diff --git a/twitter-eventlistener-plugin/pom.xml b/twitter-eventlistener-plugin/pom.xml new file mode 100644 index 0000000000000..b06141de197ff --- /dev/null +++ b/twitter-eventlistener-plugin/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + com.facebook.presto + presto-root + 0.157-tw-0.29 + + + twitter-eventlistener-plugin + Twitter Event Listener - scribes QueryCompletedEvent + presto-plugin + + + ${project.parent.basedir} + + + + + com.facebook.presto + presto-spi + 0.157-tw-0.29 + provided + + + io.airlift + log + + + com.google.guava + guava + + + + + com.twitter + presto-thrift-java + 0.0.1 + + + com.twitter + util-core_2.11 + + + com.twitter + util-core-java + + + com.twitter + util-function_2.10 + + + com.twitter + util-function-java + + + commons-logging + commons-logging + + + org.scala-lang.modules + scala-parser-combinators_2.11 + + + com.twitter + scrooge-core + + + org.scala-lang + scala-library + + + org.scala-lang + scala-reflect + + + + + com.twitter + util-logging_2.10 + 6.34.0 + + + commons-logging + commons-logging + + + + + org.apache.thrift + libthrift + + + org.scala-lang + scala-library + 2.10.6 + + + commons-logging + commons-logging + + + + + diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/QueryCompletedEventScriber.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/QueryCompletedEventScriber.java new file mode 100644 index 0000000000000..e9c2faf3eda00 --- /dev/null +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/QueryCompletedEventScriber.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.presto.plugin.eventlistener; + +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.eventlistener.QueryContext; +import com.facebook.presto.spi.eventlistener.QueryFailureInfo; +import com.facebook.presto.spi.eventlistener.QueryMetadata; +import com.facebook.presto.spi.eventlistener.QueryStatistics; + +import com.twitter.presto.thriftjava.QueryCompletionEvent; +import com.twitter.presto.thriftjava.QueryState; + +import io.airlift.log.Logger; +import org.apache.thrift.TException; + +/** + * Class that scribes query completion events + */ +public class QueryCompletedEventScriber +{ + private static final String DASH = "-"; + private static final Logger log = Logger.get(QueryCompletedEventScriber.class); + + private TwitterScriber scriber = new TwitterScriber("presto_query_completion"); + + public void handle(QueryCompletedEvent event) + { + try { + scriber.scribe(toThriftQueryCompletionEvent(event)); + } + catch (TException e) { + log.warn(e, + String.format("Could not serialize thrift object of Query(id=%s, user=%s, env=%s, schema=%s.%s)", + event.getMetadata().getQueryId(), + event.getContext().getUser(), + event.getContext().getEnvironment(), + event.getContext().getCatalog().orElse(DASH), + event.getContext().getSchema().orElse(DASH))); + } + } + + private static QueryCompletionEvent toThriftQueryCompletionEvent(QueryCompletedEvent event) + { + QueryMetadata eventMetadata = event.getMetadata(); + QueryContext eventContext = event.getContext(); + QueryStatistics eventStat = event.getStatistics(); + + QueryCompletionEvent thriftEvent = + new com.twitter.presto.thriftjava.QueryCompletionEvent(); + + thriftEvent.query_id = eventMetadata.getQueryId(); + thriftEvent.transaction_id = eventMetadata.getTransactionId().orElse(DASH); + thriftEvent.user = eventContext.getUser(); + thriftEvent.principal = eventContext.getPrincipal().orElse(DASH); + thriftEvent.source = eventContext.getSource().orElse(DASH); + thriftEvent.server_version = eventContext.getServerVersion(); + thriftEvent.environment = eventContext.getEnvironment(); + thriftEvent.catalog = eventContext.getCatalog().orElse(DASH); + thriftEvent.schema = eventContext.getSchema().orElse(DASH); + thriftEvent.remote_client_address = eventContext.getRemoteClientAddress().orElse(DASH); + thriftEvent.user_agent = eventContext.getUserAgent().orElse(DASH); + thriftEvent.query_state = QueryState.valueOf(eventMetadata.getQueryState()); + thriftEvent.uri = eventMetadata.getUri().toString(); + thriftEvent.query = eventMetadata.getQuery(); + thriftEvent.create_time_ms = event.getCreateTime().toEpochMilli(); + thriftEvent.execution_start_time_ms = event.getExecutionStartTime().toEpochMilli(); + thriftEvent.end_time_ms = event.getEndTime().toEpochMilli(); + thriftEvent.queued_time_ms = eventStat.getQueuedTime().toMillis(); + thriftEvent.query_wall_time_ms = eventStat.getWallTime().toMillis(); + if (eventStat.getAnalysisTime().isPresent()) { + thriftEvent.analysis_time_ms = eventStat.getAnalysisTime().get().toMillis(); + } + if (eventStat.getDistributedPlanningTime().isPresent()) { + thriftEvent.distributed_planning_time_ms = eventStat.getDistributedPlanningTime().get().toMillis(); + } + thriftEvent.total_bytes = eventStat.getTotalBytes(); + thriftEvent.total_rows = eventStat.getTotalRows(); + thriftEvent.splits = eventStat.getCompletedSplits(); + if (event.getFailureInfo().isPresent()) { + QueryFailureInfo eventFailureInfo = event.getFailureInfo().get(); + thriftEvent.error_code_id = eventFailureInfo.getErrorCode().getCode(); + thriftEvent.error_code_name = eventFailureInfo.getErrorCode().getName(); + thriftEvent.failure_type = eventFailureInfo.getFailureType().orElse(DASH); + thriftEvent.failure_message = eventFailureInfo.getFailureMessage().orElse(DASH); + thriftEvent.failure_task = eventFailureInfo.getFailureTask().orElse(DASH); + thriftEvent.failure_host = eventFailureInfo.getFailureHost().orElse(DASH); + thriftEvent.failures_json = eventFailureInfo.getFailuresJson(); + } + + return thriftEvent; + } +} diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListener.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListener.java new file mode 100644 index 0000000000000..0c622b0ba34cb --- /dev/null +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListener.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.presto.plugin.eventlistener; + +import com.facebook.presto.spi.eventlistener.EventListener; +import com.facebook.presto.spi.eventlistener.QueryCompletedEvent; +import com.facebook.presto.spi.eventlistener.QueryCreatedEvent; +import com.facebook.presto.spi.eventlistener.SplitCompletedEvent; + +import io.airlift.log.Logger; + +public class TwitterEventListener implements EventListener +{ + private static final Logger log = Logger.get(TwitterEventListener.class); + private final QueryCompletedEventScriber scriber = new QueryCompletedEventScriber(); + + @Override + public void queryCreated(QueryCreatedEvent queryCreatedEvent) + { + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + scriber.handle(queryCompletedEvent); + } + + @Override + public void splitCompleted(SplitCompletedEvent splitCompletedEvent) + { + } +} diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerFactory.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerFactory.java new file mode 100644 index 0000000000000..6fedc2d780f16 --- /dev/null +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.presto.plugin.eventlistener; + +import com.facebook.presto.spi.eventlistener.EventListener; +import com.facebook.presto.spi.eventlistener.EventListenerFactory; + +import java.util.Map; + +public class TwitterEventListenerFactory implements EventListenerFactory +{ + @Override + public String getName() + { + return "twitter-event-listener"; + } + + @Override + public EventListener create(Map config) + { + return new TwitterEventListener(); + } +} diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerPlugin.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerPlugin.java new file mode 100644 index 0000000000000..5572560584901 --- /dev/null +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterEventListenerPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.presto.plugin.eventlistener; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.eventlistener.EventListenerFactory; + +import com.google.common.collect.ImmutableList; + +public class TwitterEventListenerPlugin implements Plugin +{ + @Override + public Iterable getEventListenerFactories() + { + return ImmutableList.of(new TwitterEventListenerFactory()); + } +} diff --git a/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterScriber.java b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterScriber.java new file mode 100644 index 0000000000000..9e8524602255a --- /dev/null +++ b/twitter-eventlistener-plugin/src/main/java/com/twitter/presto/plugin/eventlistener/TwitterScriber.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.presto.plugin.eventlistener; + +import com.twitter.logging.BareFormatter$; +import com.twitter.logging.Level; +import com.twitter.logging.QueueingHandler; +import com.twitter.logging.ScribeHandler; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.util.Base64; +import java.util.logging.LogRecord; + +public class TwitterScriber +{ + private static final String DASH = "-"; + private static final int MAX_QUEUE_SIZE = 1000; + + private QueueingHandler queueingHandler; + + // TSerializer is not thread safe + private final ThreadLocal serializer = new ThreadLocal() + { + @Override protected TSerializer initialValue() + { + return new TSerializer(); + } + }; + + public TwitterScriber(String scribeCategory) + { + ScribeHandler scribeHandler = new ScribeHandler( + ScribeHandler.DefaultHostname(), + ScribeHandler.DefaultPort(), + scribeCategory, + ScribeHandler.DefaultBufferTime(), + ScribeHandler.DefaultConnectBackoff(), + ScribeHandler.DefaultMaxMessagesPerTransaction(), + ScribeHandler.DefaultMaxMessagesToBuffer(), + BareFormatter$.MODULE$, + scala.Option.apply((Level) null)); + queueingHandler = new QueueingHandler(scribeHandler, MAX_QUEUE_SIZE); + } + + public void scribe(TBase thriftMessage) throws TException + { + scribe(serializeThriftToString(thriftMessage)); + } + + /** + * Serialize a thrift object to bytes, compress, then encode as a base64 string. + * Throws TException + */ + private String serializeThriftToString(TBase thriftMessage) throws TException + { + return Base64.getEncoder().encodeToString(serializer.get().serialize(thriftMessage)); + } + + private void scribe(String message) + { + LogRecord logRecord = new LogRecord(Level.ALL, message); + queueingHandler.publish(logRecord); + } +}