Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import java.io.IOException;
import java.util.Base64;
import java.util.Objects;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,18 +55,31 @@ public class ScannerInstanceResource extends ResourceBase {

ResultGenerator generator = null;
String id = null;
String owner;
int batch = 1;

public ScannerInstanceResource() throws IOException {
}

public ScannerInstanceResource(String table, String id, ResultGenerator generator, int batch)
public ScannerInstanceResource(String id, String owner, ResultGenerator generator, int batch)
throws IOException {
this.id = id;
this.owner = owner;
this.generator = generator;
this.batch = batch;
}

private Response checkOwner() {
ConnectionCache connCache = RESTServlet.getInstance().getConnectionCache();
if (!Objects.equals(connCache.getEffectiveUser(), owner)) {
LOG.warn("User {} is trying to access scanner {} which belongs to user {}",
connCache.getEffectiveUser(), id, owner);
return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
.entity("Not allowed" + CRLF).build();
}
return null;
}

@GET
@Produces({ MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, MIMETYPE_PROTOBUF_IETF })
public Response get(final @Context UriInfo uriInfo, @QueryParam("n") int maxRows,
Expand All @@ -77,10 +92,13 @@ public Response get(final @Context UriInfo uriInfo, @QueryParam("n") int maxRows
servlet.getMetrics().incrementFailedGetRequests(1);
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
} else {
// Updated the connection access time for each client next() call
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
}
Response checkResp = checkOwner();
if (checkResp != null) {
return checkResp;
}
// Updated the connection access time for each client next() call
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
CellSetModel model = new CellSetModel();
RowModel rowModel = null;
byte[] rowKeyArray = null;
Expand Down Expand Up @@ -159,13 +177,18 @@ public Response getBinary(final @Context UriInfo uriInfo) {
if (LOG.isTraceEnabled()) {
LOG.trace("GET " + uriInfo.getAbsolutePath() + " as " + MIMETYPE_BINARY);
}

servlet.getMetrics().incrementRequests(1);
if (generator == null) {
servlet.getMetrics().incrementFailedGetRequests(1);
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
}
Response checkResp = checkOwner();
if (checkResp != null) {
return checkResp;
}
try {
if (generator == null) {
servlet.getMetrics().incrementFailedGetRequests(1);
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
}
Cell value = generator.next();
if (value == null) {
if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -199,6 +222,7 @@ public Response delete(final @Context UriInfo uriInfo) {
if (LOG.isTraceEnabled()) {
LOG.trace("DELETE " + uriInfo.getAbsolutePath());
}

servlet.getMetrics().incrementRequests(1);
if (servlet.isReadOnly()) {
return Response.status(Response.Status.FORBIDDEN).type(MIMETYPE_TEXT)
Expand All @@ -209,6 +233,10 @@ public Response delete(final @Context UriInfo uriInfo) {
return Response.status(Response.Status.NOT_FOUND).type(MIMETYPE_TEXT)
.entity("Not found" + CRLF).build();
}
Response checkResp = checkOwner();
if (checkResp != null) {
return checkResp;
}
if (ScannerResource.delete(id)) {
servlet.getMetrics().incrementSucessfulDeleteRequests(1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ Response update(final ScannerModel model, final boolean replace, final UriInfo u
model.getCaching(), model.getCacheBlocks(), model.getLimit(), model.isIncludeStartRow(),
model.isIncludeStopRow());
String id = gen.getID();
ScannerInstanceResource instance =
new ScannerInstanceResource(tableName, id, gen, model.getBatch());
ScannerInstanceResource instance = new ScannerInstanceResource(id,
RESTServlet.getInstance().getConnectionCache().getEffectiveUser(), gen, model.getBatch());
scanners.put(id, instance);
if (LOG.isTraceEnabled()) {
LOG.trace("new scanner: " + id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.rest.model.ScannerModel;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.access.AccessControlClient;
import org.apache.hadoop.hbase.security.access.AccessControlConstants;
Expand All @@ -70,7 +71,9 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
Expand Down Expand Up @@ -110,6 +113,7 @@ public class TestSecureRESTServer {

private static final String HOSTNAME = "localhost";
private static final String CLIENT_PRINCIPAL = "client";
private static final String CLIENT_PRINCIPAL2 = "client2";
private static final String WHEEL_PRINCIPAL = "wheel";
// The principal for accepting SPNEGO authn'ed requests (*must* be HTTP/fqdn)
private static final String SPNEGO_SERVICE_PRINCIPAL = "HTTP/" + HOSTNAME;
Expand Down Expand Up @@ -156,7 +160,7 @@ public static void setupServer() throws Exception {
* Start KDC
*/
KDC = TEST_UTIL.setupMiniKdc(serviceKeytab);
KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL);
KDC.createPrincipal(clientKeytab, CLIENT_PRINCIPAL, CLIENT_PRINCIPAL2);
KDC.createPrincipal(wheelKeytab, WHEEL_PRINCIPAL);
KDC.createPrincipal(serviceKeytab, SERVICE_PRINCIPAL);
// REST server's keytab contains keys for both principals REST uses
Expand Down Expand Up @@ -189,7 +193,7 @@ public static void setupServer() throws Exception {
updateKerberosConfiguration(conf, REST_SERVER_PRINCIPAL, SPNEGO_SERVICE_PRINCIPAL,
restServerKeytab);

// Start HDFS
// Start HBase
TEST_UTIL.startMiniCluster(StartTestingClusterOption.builder().numMasters(1).numRegionServers(1)
.numZkServers(1).build());

Expand Down Expand Up @@ -330,10 +334,10 @@ public Void run() throws Exception {
});
}

public void testProxy(String extraArgs, String PRINCIPAL, File keytab, int responseCode)
private void testProxy(String extraArgs, String PRINCIPAL, File keytab, int responseCode)
throws Exception {
UserGroupInformation superuser = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL, serviceKeytab.getAbsolutePath());
UserGroupInformation.loginUserFromKeytabAndReturnUGI(SERVICE_PRINCIPAL,
serviceKeytab.getAbsolutePath());
final TableName table = TableName.valueOf("publicTable");

// Read that row as the client
Expand Down Expand Up @@ -417,6 +421,80 @@ public Void run() throws Exception {
});
}

@Test
public void testScanWithDifferentClients() throws Exception {
Pair<CloseableHttpClient, HttpClientContext> pair = getClient();
CloseableHttpClient client = pair.getFirst();
HttpClientContext context = pair.getSecond();

UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, clientKeytab.getAbsolutePath());

ObjectMapper mapper = new JacksonJaxbJsonProvider().locateMapper(ScannerModel.class,
MediaType.APPLICATION_JSON_TYPE);
TableName table = TableName.valueOf("publicTable");
ScannerModel model = new ScannerModel();
StringEntity entity =
new StringEntity(mapper.writeValueAsString(model), ContentType.APPLICATION_JSON);
HttpPost post =
new HttpPost("http://localhost:" + REST_TEST.getServletPort() + "/" + table + "/scanner");
post.setEntity(entity);
String scannerURI = ugi.doAs(new PrivilegedExceptionAction<String>() {

@Override
public String run() throws Exception {
try (CloseableHttpResponse response = client.execute(post, context)) {
final int statusCode = response.getStatusLine().getStatusCode();
assertEquals(HttpURLConnection.HTTP_CREATED, statusCode);
return response.getFirstHeader("Location").getValue();
}
}
});

Pair<CloseableHttpClient, HttpClientContext> pair2 = getClient();
CloseableHttpClient client2 = pair2.getFirst();
HttpClientContext context2 = pair2.getSecond();

UserGroupInformation ugi2 = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL2, clientKeytab.getAbsolutePath());
ugi2.doAs(new PrivilegedExceptionAction<Void>() {

@Override
public Void run() throws Exception {
HttpGet get = new HttpGet(scannerURI + "?n=1");
try (CloseableHttpResponse response = client2.execute(get, context2)) {
final int statusCode = response.getStatusLine().getStatusCode();
assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode);
}
HttpDelete delete = new HttpDelete(scannerURI);
try (CloseableHttpResponse response = client2.execute(delete, context2)) {
final int statusCode = response.getStatusLine().getStatusCode();
assertEquals(HttpURLConnection.HTTP_FORBIDDEN, statusCode);
}
return null;
}
});

ugi.doAs(new PrivilegedExceptionAction<Void>() {

@Override
public Void run() throws Exception {
HttpGet get = new HttpGet(scannerURI + "?n=1");
try (CloseableHttpResponse response = client.execute(get, context)) {
final int statusCode = response.getStatusLine().getStatusCode();
assertEquals(HttpURLConnection.HTTP_OK, statusCode);
}
HttpDelete delete = new HttpDelete(scannerURI);
try (CloseableHttpResponse response = client.execute(delete, context)) {
final int statusCode = response.getStatusLine().getStatusCode();
assertEquals(HttpURLConnection.HTTP_OK, statusCode);
}
return null;
}
});

}

private Pair<CloseableHttpClient, HttpClientContext> getClient() {
HttpClientConnectionManager pool = new PoolingHttpClientConnectionManager();
HttpHost host = new HttpHost("localhost", REST_TEST.getServletPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,80 @@
*/
package org.apache.hadoop.hbase.thrift;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.RemovalCause;

/**
* abstract class for HBase handler providing a Connection cache and get table/admin method
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public abstract class HBaseServiceHandler {

private static final Logger LOG = LoggerFactory.getLogger(HBaseServiceHandler.class);

public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";

protected Configuration conf;

protected final ConnectionCache connectionCache;

protected static final class ResultScannerWrapper {
public final ResultScanner scanner;
public final boolean sortColumns;
public final String owner;

public ResultScannerWrapper(ResultScanner scanner, boolean sortColumns, String owner) {
this.scanner = scanner;
this.sortColumns = sortColumns;
this.owner = owner;
}
}

private final AtomicInteger nextScannerId = new AtomicInteger(0);
private final Cache<Integer, ResultScannerWrapper> scannerMap;
private final KeyLocker<Integer> removeScannerLock = new KeyLocker<>();

public HBaseServiceHandler(final Configuration c, final UserProvider userProvider)
throws IOException {
this.conf = c;
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(conf, userProvider, cleanInterval, maxIdleTime);
long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
scannerMap = CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
.removalListener(notification -> {
// do not close the scanner if it is removed manually, we will either add it back or close
// it manually.
if (notification.getCause() != RemovalCause.EXPLICIT) {
((ResultScannerWrapper) notification.getValue()).scanner.close();
}
}).build();
}

protected ThriftMetrics metrics = null;
Expand All @@ -58,6 +103,51 @@ public void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}

/**
* Assigns a unique ID to the scanner and adds the mapping to an internal HashMap.
* @param scanner to add
* @return Id for this Scanner
*/
protected int addScanner(ResultScanner scanner, boolean sortColumns) {
int id = nextScannerId.getAndIncrement();
ResultScannerWrapper wrapper =
new ResultScannerWrapper(scanner, sortColumns, connectionCache.getEffectiveUser());
scannerMap.put(id, wrapper);
return id;
}

/**
* Add the given scanner back to scanner map.
* <p>
* When scanning, we need to remove the scanner from scanner map to prevent expiration during
* scanning.
*/
protected void addScannerBack(int id, ResultScannerWrapper wrapper) {
scannerMap.put(id, wrapper);
}

/**
* Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove
* @throws AccessDeniedException if the scanner is not belong to the current user
*/
protected ResultScannerWrapper removeScanner(int id) throws IOException {
Lock lock = removeScannerLock.acquireLock(id);
try {
ResultScannerWrapper wrapper = scannerMap.getIfPresent(id);
if (wrapper != null && !Objects.equals(connectionCache.getEffectiveUser(), wrapper.owner)) {
LOG.warn("User {} is trying to access scanner id = {} where owner = {}",
connectionCache.getEffectiveUser(), id, wrapper.owner);
throw new AccessDeniedException(
"User " + connectionCache.getEffectiveUser() + " is not allowed to access scanner " + id);
}
scannerMap.invalidate(id);
return wrapper;
} finally {
lock.unlock();
}
}

/**
* Obtain HBaseAdmin. Creates the instance if it is not already created.
*/
Expand Down
Loading