diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java index 28ba60fa19b7..c299b68b8b79 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerInstanceResource.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Base64; +import java.util.Objects; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableNotFoundException; @@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ConnectionCache; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,18 +55,31 @@ public class ScannerInstanceResource extends ResourceBase { ResultGenerator generator = null; String id = null; + String owner; int batch = 1; public ScannerInstanceResource() throws IOException { } - public ScannerInstanceResource(String table, String id, ResultGenerator generator, int batch) + public ScannerInstanceResource(String id, String owner, ResultGenerator generator, int batch) throws IOException { this.id = id; + this.owner = owner; this.generator = generator; this.batch = batch; } + private Response checkOwner() { + ConnectionCache connCache = RESTServlet.getInstance().getConnectionCache(); + if (!Objects.equals(connCache.getEffectiveUser(), owner)) { + LOG.warn("User {} is trying to access scanner {} which belongs to user {}", + connCache.getEffectiveUser(), id, owner); + return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT) + .entity("Not allowed" + CRLF).build(); + } + return null; + } + @GET @Produces({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF }) public Response get(final @Context UriInfo uriInfo, @QueryParam("n") int maxRows, @@ -77,10 +92,13 @@ public Response get(final @Context UriInfo uriInfo, @QueryParam("n") int maxRows servlet.getMetrics().incrementFailedGetRequests(1); return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) .entity("Not found" + CRLF).build(); - } else { - // Updated the connection access time for each client next() call - RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime(); } + Response checkResp = checkOwner(); + if (checkResp != null) { + return checkResp; + } + // Updated the connection access time for each client next() call + RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime(); CellSetModel model = new CellSetModel(); RowModel rowModel = null; byte[] rowKeyArray = null; @@ -159,13 +177,18 @@ public Response getBinary(final @Context UriInfo uriInfo) { if (LOG.isTraceEnabled()) { LOG.trace("GET " + uriInfo.getAbsolutePath() + " as " + MIMETYPE_BINARY); } + servlet.getMetrics().incrementRequests(1); + if (generator == null) { + servlet.getMetrics().incrementFailedGetRequests(1); + return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) + .entity("Not found" + CRLF).build(); + } + Response checkResp = checkOwner(); + if (checkResp != null) { + return checkResp; + } try { - if (generator == null) { - servlet.getMetrics().incrementFailedGetRequests(1); - return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) - .entity("Not found" + CRLF).build(); - } Cell value = generator.next(); if (value == null) { if (LOG.isTraceEnabled()) { @@ -199,6 +222,7 @@ public Response delete(final @Context UriInfo uriInfo) { if (LOG.isTraceEnabled()) { LOG.trace("DELETE " + uriInfo.getAbsolutePath()); } + servlet.getMetrics().incrementRequests(1); if (servlet.isReadOnly()) { return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT) @@ -209,6 +233,10 @@ public Response delete(final @Context UriInfo uriInfo) { return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT) .entity("Not found" + CRLF).build(); } + Response checkResp = checkOwner(); + if (checkResp != null) { + return checkResp; + } if (ScannerResource.delete(id)) { servlet.getMetrics().incrementSucessfulDeleteRequests(1); } else { diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java index 2a3da6c3af4b..cd06c7376c4f 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ScannerResource.java @@ -114,8 +114,8 @@ Response update(final ScannerModel model, final boolean replace, final UriInfo u model.getCaching(), model.getCacheBlocks(), model.getLimit(), model.isIncludeStartRow(), model.isIncludeStopRow()); String id = gen.getID(); - ScannerInstanceResource instance = - new ScannerInstanceResource(tableName, id, gen, model.getBatch()); + ScannerInstanceResource instance = new ScannerInstanceResource(id, + RESTServlet.getInstance().getConnectionCache().getEffectiveUser(), gen, model.getBatch()); scanners.put(id, instance); if (LOG.isTraceEnabled()) { LOG.trace("new scanner: " + id); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java index feb1c1e2d6de..c6156ab6d041 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/TestSecureRESTServer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.model.ScannerModel; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.security.access.AccessControlConstants; @@ -70,7 +71,9 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.AuthSchemes; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.config.Registry; @@ -110,6 +113,7 @@ public class TestSecureRESTServer { private static final String HOSTNAME = "localhost"; private static final String CLIENT_PRINCIPAL = "client"; + private static final String CLIENT_PRINCIPAL2 = "client2"; private static final String WHEEL_PRINCIPAL = "wheel"; // The principal for accepting SPNEGO authn'ed requests (*must* be HTTP/fqdn) private static final String SPNEGO_SERVICE_PRINCIPAL = "HTTP/" + HOSTNAME; @@ -156,7 +160,7 @@ public static void setupServer() throws Exception { * Start KDC */ KDC = TEST_UTIL.setupMiniKdc(serviceKeytab); - KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL); + KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL, CLIENT_PRINCIPAL2); KDC.createPrincipal(wheelKeytab, WHEEL_PRINCIPAL); KDC.createPrincipal(serviceKeytab, SERVICE_PRINCIPAL); // REST server's keytab contains keys for both principals REST uses @@ -189,7 +193,7 @@ public static void setupServer() throws Exception { updateKerberosConfiguration(conf, REST_SERVER_PRINCIPAL, SPNEGO_SERVICE_PRINCIPAL, restServerKeytab); - // Start HDFS + // Start HBase TEST_UTIL.startMiniCluster(StartTestingClusterOption.builder().numMasters(1).numRegionServers(1) .numZkServers(1).build()); @@ -330,10 +334,10 @@ public Void run() throws Exception { }); } - public void testProxy(String extraArgs, String PRINCIPAL, File keytab, int responseCode) + private void testProxy(String extraArgs, String PRINCIPAL, File keytab, int responseCode) throws Exception { - UserGroupInformation superuser = UserGroupInformation - .loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL, serviceKeytab.getAbsolutePath()); + UserGroupInformation.loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL, + serviceKeytab.getAbsolutePath()); final TableName table = TableName.valueOf("publicTable"); // Read that row as the client @@ -417,6 +421,80 @@ public Void run() throws Exception { }); } + @Test + public void testScanWithDifferentClients() throws Exception { + Pair pair = getClient(); + CloseableHttpClient client = pair.getFirst(); + HttpClientContext context = pair.getSecond(); + + UserGroupInformation ugi = UserGroupInformation + .loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, clientKeytab.getAbsolutePath()); + + ObjectMapper mapper = new JacksonJaxbJsonProvider().locateMapper(ScannerModel.class, + MediaType.APPLICATION_JSON_TYPE); + TableName table = TableName.valueOf("publicTable"); + ScannerModel model = new ScannerModel(); + StringEntity entity = + new StringEntity(mapper.writeValueAsString(model), ContentType.APPLICATION_JSON); + HttpPost post = + new HttpPost("http://localhost:" + REST_TEST.getServletPort() + "/" + table + "/scanner"); + post.setEntity(entity); + String scannerURI = ugi.doAs(new PrivilegedExceptionAction() { + + @Override + public String run() throws Exception { + try (CloseableHttpResponse response = client.execute(post, context)) { + final int statusCode = response.getStatusLine().getStatusCode(); + assertEquals(HttpURLConnection.HTTP_CREATED, statusCode); + return response.getFirstHeader("Location").getValue(); + } + } + }); + + Pair pair2 = getClient(); + CloseableHttpClient client2 = pair2.getFirst(); + HttpClientContext context2 = pair2.getSecond(); + + UserGroupInformation ugi2 = UserGroupInformation + .loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL2, clientKeytab.getAbsolutePath()); + ugi2.doAs(new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + HttpGet get = new HttpGet(scannerURI + "?n=1"); + try (CloseableHttpResponse response = client2.execute(get, context2)) { + final int statusCode = response.getStatusLine().getStatusCode(); + assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode); + } + HttpDelete delete = new HttpDelete(scannerURI); + try (CloseableHttpResponse response = client2.execute(delete, context2)) { + final int statusCode = response.getStatusLine().getStatusCode(); + assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode); + } + return null; + } + }); + + ugi.doAs(new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + HttpGet get = new HttpGet(scannerURI + "?n=1"); + try (CloseableHttpResponse response = client.execute(get, context)) { + final int statusCode = response.getStatusLine().getStatusCode(); + assertEquals(HttpURLConnection.HTTP_OK, statusCode); + } + HttpDelete delete = new HttpDelete(scannerURI); + try (CloseableHttpResponse response = client.execute(delete, context)) { + final int statusCode = response.getStatusLine().getStatusCode(); + assertEquals(HttpURLConnection.HTTP_OK, statusCode); + } + return null; + } + }); + + } + private Pair getClient() { HttpClientConnectionManager pool = new PoolingHttpClientConnectionManager(); HttpHost host = new HttpHost("localhost", REST_TEST.getServletPort()); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java index 79f5d4ee830d..6b28c4c95545 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java @@ -17,22 +17,41 @@ */ package org.apache.hadoop.hbase.thrift; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; + import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConnectionCache; +import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.RemovalCause; /** * abstract class for HBase handler providing a Connection cache and get table/admin method */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) public abstract class HBaseServiceHandler { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseServiceHandler.class); + public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; @@ -40,12 +59,38 @@ public abstract class HBaseServiceHandler { protected final ConnectionCache connectionCache; + protected static final class ResultScannerWrapper { + public final ResultScanner scanner; + public final boolean sortColumns; + public final String owner; + + public ResultScannerWrapper(ResultScanner scanner, boolean sortColumns, String owner) { + this.scanner = scanner; + this.sortColumns = sortColumns; + this.owner = owner; + } + } + + private final AtomicInteger nextScannerId = new AtomicInteger(0); + private final Cache scannerMap; + private final KeyLocker removeScannerLock = new KeyLocker<>(); + public HBaseServiceHandler(final Configuration c, final UserProvider userProvider) throws IOException { this.conf = c; int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); connectionCache = new ConnectionCache(conf, userProvider, cleanInterval, maxIdleTime); + long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS) + .removalListener(notification -> { + // do not close the scanner if it is removed manually, we will either add it back or close + // it manually. + if (notification.getCause() != RemovalCause.EXPLICIT) { + ((ResultScannerWrapper) notification.getValue()).scanner.close(); + } + }).build(); } protected ThriftMetrics metrics = null; @@ -58,6 +103,51 @@ public void setEffectiveUser(String effectiveUser) { connectionCache.setEffectiveUser(effectiveUser); } + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. + * @param scanner to add + * @return Id for this Scanner + */ + protected int addScanner(ResultScanner scanner, boolean sortColumns) { + int id = nextScannerId.getAndIncrement(); + ResultScannerWrapper wrapper = + new ResultScannerWrapper(scanner, sortColumns, connectionCache.getEffectiveUser()); + scannerMap.put(id, wrapper); + return id; + } + + /** + * Add the given scanner back to scanner map. + *

+ * When scanning, we need to remove the scanner from scanner map to prevent expiration during + * scanning. + */ + protected void addScannerBack(int id, ResultScannerWrapper wrapper) { + scannerMap.put(id, wrapper); + } + + /** + * Removes the scanner associated with the specified ID from the internal HashMap. + * @param id of the Scanner to remove + * @throws AccessDeniedException if the scanner is not belong to the current user + */ + protected ResultScannerWrapper removeScanner(int id) throws IOException { + Lock lock = removeScannerLock.acquireLock(id); + try { + ResultScannerWrapper wrapper = scannerMap.getIfPresent(id); + if (wrapper != null && !Objects.equals(connectionCache.getEffectiveUser(), wrapper.owner)) { + LOG.warn("User {} is trying to access scanner id = {} where owner = {}", + connectionCache.getEffectiveUser(), id, wrapper.owner); + throw new AccessDeniedException( + "User " + connectionCache.getEffectiveUser() + " is not allowed to access scanner " + id); + } + scannerMap.invalidate(id); + return wrapper; + } finally { + lock.unlock(); + } + } + /** * Obtain HBaseAdmin. Creates the instance if it is not already created. */ diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java index 1900c6c0f8da..2ffda279a21c 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.thrift; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY; import static org.apache.hadoop.hbase.util.Bytes.getBytes; @@ -30,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CatalogFamilyFormat; import org.apache.hadoop.hbase.Cell; @@ -91,8 +88,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Throwables; -import org.apache.hbase.thirdparty.com.google.common.cache.Cache; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; /** * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the HBase client API @@ -105,9 +100,6 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hb public static final int HREGION_VERSION = 1; - // nextScannerId and scannerMap are used to manage scanner state - private int nextScannerId = 0; - private Cache scannerMap; IncrementCoalescer coalescer; /** @@ -122,44 +114,9 @@ byte[][] getAllColumns(Table table) throws IOException { return columns; } - /** - * Assigns a unique ID to the scanner and adds the mapping to an internal hash-map. - * @param scanner the {@link ResultScanner} to add - * @return integer scanner id - */ - protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) { - int id = nextScannerId++; - ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns); - scannerMap.put(id, resultScannerWrapper); - return id; - } - - /** - * Returns the scanner associated with the specified ID. - * @param id the ID of the scanner to get - * @return a Scanner, or null if ID was invalid. - */ - private synchronized ResultScannerWrapper getScanner(int id) { - return scannerMap.getIfPresent(id); - } - - /** - * Removes the scanner associated with the specified ID from the internal id->scanner hash-map. - * @param id the ID of the scanner to remove - */ - private synchronized void removeScanner(int id) { - scannerMap.invalidate(id); - } - protected ThriftHBaseServiceHandler(final Configuration c, final UserProvider userProvider) throws IOException { super(c, userProvider); - long cacheTimeout = - c.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) - * 2; - - scannerMap = - CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build(); this.coalescer = new IncrementCoalescer(this); } @@ -793,40 +750,50 @@ protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte[] fami @Override public void scannerClose(int id) throws IOError, IllegalArgument { LOG.debug("scannerClose: id={}", id); - ResultScannerWrapper resultScannerWrapper = getScanner(id); + ResultScannerWrapper resultScannerWrapper; + try { + resultScannerWrapper = removeScanner(id); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } if (resultScannerWrapper == null) { LOG.warn("scanner ID is invalid"); throw new IllegalArgument("scanner ID is invalid"); } - resultScannerWrapper.getScanner().close(); - removeScanner(id); + resultScannerWrapper.scanner.close(); } @Override public List scannerGetList(int id, int nbRows) throws IllegalArgument, IOError { LOG.debug("scannerGetList: id={}", id); - ResultScannerWrapper resultScannerWrapper = getScanner(id); + ResultScannerWrapper resultScannerWrapper; + try { + resultScannerWrapper = removeScanner(id); + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); + } if (null == resultScannerWrapper) { String message = "scanner ID is invalid"; LOG.warn(message); throw new IllegalArgument("scanner ID is invalid"); } - - Result[] results; try { - results = resultScannerWrapper.getScanner().next(nbRows); - if (null == results) { - return new ArrayList<>(); + Result[] results; + try { + results = resultScannerWrapper.scanner.next(nbRows); + if (null == results) { + return new ArrayList<>(); + } + } catch (IOException e) { + LOG.warn(e.getMessage(), e); + throw getIOError(e); } - } catch (IOException e) { - LOG.warn(e.getMessage(), e); - throw getIOError(e); + return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.sortColumns); } finally { - // Add scanner back to scannerMap; protects against case - // where scanner expired during processing of request. - scannerMap.put(id, resultScannerWrapper); + addScannerBack(id, resultScannerWrapper); } - return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted()); } @Override @@ -837,7 +804,6 @@ public List scannerGet(int id) throws IllegalArgument, IOError { @Override public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, Map attributes) throws IOError { - Table table = null; try { table = getTable(tableName); @@ -890,7 +856,6 @@ public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, @Override public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List columns, Map attributes) throws IOError { - Table table = null; try { table = getTable(tableName); @@ -918,7 +883,6 @@ public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List columns, Map attributes) throws IOError, TException { - Table table = null; try { table = getTable(tableName); @@ -946,7 +910,6 @@ public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, ByteBu @Override public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix, List columns, Map attributes) throws IOError, TException { - Table table = null; try { table = getTable(tableName); @@ -976,7 +939,6 @@ public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix @Override public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List columns, long timestamp, Map attributes) throws IOError, TException { - Table table = null; try { table = getTable(tableName); @@ -1006,7 +968,6 @@ public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List columns, long timestamp, Map attributes) throws IOError, TException { - Table table = null; try { table = getTable(tableName); @@ -1291,25 +1252,6 @@ private static void addAttributes(OperationWithAttributes op, } } - protected static class ResultScannerWrapper { - - private final ResultScanner scanner; - private final boolean sortColumns; - - public ResultScannerWrapper(ResultScanner resultScanner, boolean sortResultColumns) { - scanner = resultScanner; - sortColumns = sortResultColumns; - } - - public ResultScanner getScanner() { - return scanner; - } - - public boolean isColumnSorted() { - return sortColumns; - } - } - public static class IOErrorWithCause extends IOError { private final Throwable cause; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index 1397bc49b2aa..9338744788ae 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.thrift2; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift; @@ -52,8 +50,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -101,10 +97,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.cache.Cache; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; -import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener; - /** * This class is a glue object that connects Thrift RPC calls to the HBase client API primarily * defined in the Table interface. @@ -116,10 +108,6 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH // TODO: Size of pool configuraple private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); - // nextScannerId and scannerMap are used to manage scanner state - private final AtomicInteger nextScannerId = new AtomicInteger(0); - private final Cache scannerMap; - private static final IOException ioe = new DoNotRetryIOException("Thrift Server is in Read-only mode."); private boolean isReadOnly; @@ -161,13 +149,7 @@ public int hashCode() { public ThriftHBaseServiceHandler(final Configuration conf, final UserProvider userProvider) throws IOException { super(conf, userProvider); - long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT); - scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS) - .removalListener((RemovalListener) removalNotification -> removalNotification.getValue().close()) - .build(); } @Override @@ -202,34 +184,6 @@ private TIOError getTIOError(IOException e) { return err; } - /** - * Assigns a unique ID to the scanner and adds the mapping to an internal HashMap. - * @param scanner to add - * @return Id for this Scanner - */ - private int addScanner(ResultScanner scanner) { - int id = nextScannerId.getAndIncrement(); - scannerMap.put(id, scanner); - return id; - } - - /** - * Returns the Scanner associated with the specified Id. - * @param id of the Scanner to get - * @return a Scanner, or null if the Id is invalid - */ - private ResultScanner getScanner(int id) { - return scannerMap.getIfPresent(id); - } - - /** - * Removes the scanner associated with the specified ID from the internal HashMap. - * @param id of the Scanner to remove - */ - protected void removeScanner(int id) { - scannerMap.invalidate(id); - } - @Override public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException { Table htable = getTable(table); @@ -432,23 +386,30 @@ public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException } finally { closeTable(htable); } - return addScanner(resultScanner); + return addScanner(resultScanner, false); } @Override public List getScannerRows(int scannerId, int numRows) throws TIOError, TIllegalArgument, TException { - ResultScanner scanner = getScanner(scannerId); - if (scanner == null) { + ResultScannerWrapper wrapper; + try { + wrapper = removeScanner(scannerId); + } catch (IOException e) { + throw getTIOError(e); + } + if (wrapper == null) { TIllegalArgument ex = new TIllegalArgument(); ex.setMessage("Invalid scanner Id"); throw ex; } try { connectionCache.updateConnectionAccessTime(); - return resultsFromHBase(scanner.next(numRows)); + return resultsFromHBase(wrapper.scanner.next(numRows)); } catch (IOException e) { throw getTIOError(e); + } finally { + addScannerBack(scannerId, wrapper); } } @@ -475,15 +436,19 @@ public List getScannerResults(ByteBuffer table, TScan scan, int numRows @Override public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException { LOG.debug("scannerClose: id=" + scannerId); - ResultScanner scanner = getScanner(scannerId); - if (scanner == null) { + ResultScannerWrapper wrapper; + try { + wrapper = removeScanner(scannerId); + } catch (IOException e) { + throw getTIOError(e); + } + if (wrapper == null) { LOG.warn("scanner ID: " + scannerId + "is invalid"); // While the scanner could be already expired, // we should not throw exception here. Just log and return. return; } - scanner.close(); - removeScanner(scannerId); + wrapper.scanner.close(); } @Override diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index daae740d9005..3209c1912bcb 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -103,14 +103,14 @@ private static ByteBuffer asByteBuffer(long l) { } // Static names for tables, columns, rows, and values - private static ByteBuffer tableAname = asByteBuffer("tableA"); + public static ByteBuffer tableAname = asByteBuffer("tableA"); private static ByteBuffer tableBname = asByteBuffer("tableB"); private static ByteBuffer columnAname = asByteBuffer("columnA:"); - private static ByteBuffer columnAAname = asByteBuffer("columnA:A"); + public static ByteBuffer columnAAname = asByteBuffer("columnA:A"); private static ByteBuffer columnBname = asByteBuffer("columnB:"); - private static ByteBuffer rowAname = asByteBuffer("rowA"); + public static ByteBuffer rowAname = asByteBuffer("rowA"); private static ByteBuffer rowBname = asByteBuffer("rowB"); - private static ByteBuffer valueAname = asByteBuffer("valueA"); + public static ByteBuffer valueAname = asByteBuffer("valueA"); private static ByteBuffer valueBname = asByteBuffer("valueB"); private static ByteBuffer valueCname = asByteBuffer("valueC"); private static ByteBuffer valueDname = asByteBuffer("valueD"); diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java index 66ef04ac0bc2..9396cd54346d 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftSpnegoHttpServer.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.thrift; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; @@ -28,6 +30,8 @@ import java.nio.file.Paths; import java.security.Principal; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.Supplier; @@ -40,6 +44,8 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.SimpleKdcServerUtil; import org.apache.hadoop.security.authentication.util.KerberosName; @@ -87,6 +93,7 @@ public class TestThriftSpnegoHttpServer extends TestThriftHttpServerBase { private static File clientKeytab; private static String clientPrincipal; + private static String clientPrincipal2; private static String serverPrincipal; private static String spnegoServerPrincipal; @@ -116,8 +123,9 @@ public static void beforeAll() throws Exception { assertTrue(keytabDir.mkdirs()); clientPrincipal = "client@" + kdc.getKdcConfig().getKdcRealm(); + clientPrincipal2 = "client2@" + kdc.getKdcConfig().getKdcRealm(); clientKeytab = new File(keytabDir, clientPrincipal + ".keytab"); - kdc.createAndExportPrincipals(clientKeytab, clientPrincipal); + kdc.createAndExportPrincipals(clientKeytab, clientPrincipal, clientPrincipal2); String hostname = InetAddress.getLoopbackAddress().getHostName(); serverPrincipal = "hbase/" + hostname + "@" + kdc.getKdcConfig().getKdcRealm(); @@ -167,11 +175,32 @@ public void testRunThriftServerWithHeaderBufferLength() throws Exception { super.testRunThriftServerWithHeaderBufferLength(); } + private void testScanWithDifferentClients(Hbase.Client client, Hbase.Client client2) + throws Exception { + List mutations = new ArrayList<>(1); + mutations + .add(new Mutation(false, TestThriftServer.columnAAname, TestThriftServer.valueAname, true)); + client.mutateRow(TestThriftServer.tableAname, TestThriftServer.rowAname, mutations, + Collections.emptyMap()); + + int id = client.scannerOpen(TestThriftServer.tableAname, ByteBuffer.allocate(0), + Collections.emptyList(), Collections.emptyMap()); + + assertThrows(IOError.class, () -> client2.scannerGet(id)).printStackTrace(); + assertThrows(IOError.class, () -> client2.scannerClose(id)).printStackTrace(); + + assertEquals(1, client.scannerGet(id).size()); + assertEquals(0, client.scannerGet(id).size()); + client.scannerClose(id); + } + @Override protected void talkToThriftServer(String url, int customHeaderSize) throws Exception { // Close httpClient and THttpClient automatically on any failures - try (CloseableHttpClient httpClient = createHttpClient(); - THttpClient tHttpClient = new THttpClient(url, httpClient)) { + try (CloseableHttpClient httpClient = createHttpClient(clientPrincipal); + THttpClient tHttpClient = new THttpClient(url, httpClient); + CloseableHttpClient httpClient2 = createHttpClient(clientPrincipal2); + THttpClient tHttpClient2 = new THttpClient(url, httpClient2)) { tHttpClient.open(); if (customHeaderSize > 0) { StringBuilder sb = new StringBuilder(); @@ -194,11 +223,16 @@ protected void talkToThriftServer(String url, int customHeaderSize) throws Excep } TestThriftServer.createTestTables(client); TestThriftServer.checkTableList(client); + + TProtocol prop2 = new TBinaryProtocol(tHttpClient2); + Hbase.Client client2 = new Hbase.Client(prop2); + testScanWithDifferentClients(client, client2); + TestThriftServer.dropTestTables(client); } } - private CloseableHttpClient createHttpClient() throws Exception { + private CloseableHttpClient createHttpClient(String clientPrincipal) throws Exception { final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(clientPrincipal, clientKeytab); final Set clientPrincipals = clientSubject.getPrincipals(); // Make sure the subject has a principal diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java index 3e4f24d8b062..ee86b9cbb93e 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandlerWithLabels.java @@ -71,8 +71,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; @@ -80,8 +78,6 @@ @Tag(MediumTests.TAG) public class TestThriftHBaseServiceHandlerWithLabels { - private static final Logger LOG = - LoggerFactory.getLogger(TestThriftHBaseServiceHandlerWithLabels.class); private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); // Static names for tables, columns, rows, and values