# Tuning Spark Back Pressure by Simulation

Spark back pressure, which can be enabled by setting spark.streaming.backpressure.enabled=true, will dynamically resize batches so as to avoid queue build up. It is implemented using a Proportional Integral Derivative (PID) algorithm. This algorithm has some interesting properties, including the lack of guarantee of a stable fixed point. This can manifest itself not just in transient overshoot, but in a batch size oscillating around a (potentially optimal) constant throughput. The overshoot incurs latency; the undershoot costs throughput. Catastrophic overshoot leading to OOM is possible in degenerate circumstances (you need to choose the parameters quite deviously to cause this to happen). Having witnessed undershoot and slow recovery in production streaming jobs, I decided to investigate further by testing the algorithm with a simulator. This is very simple to do with JUnit by creating an instance of a PIDRateEstimator and calling its methods within a simulation loop.

#### PID Controllers

The PID controller is a closed feedback loop on a single process variable, which it aims to stabilise (but may not be able to) by minimising three error metrics – present error (w.r.t. the last measurement), accumulated or integral error, and the rate at which the error is changing, or derivative error. The total signal is a weighted sum of these factors.

$u(t) = K_{p}e(t) + K_{i}\int_{0}^{t} e(\tau) d\tau + K_{d}\frac{d}{dt}e(t)$

The $K_{p}$ term aims to react to any immediate error, the $K_{i}$ term dampens change in the process variable, and the $K_{d}$ amplifies any trend to react to underlying changes quickly. Spark allows these parameters to be tuned by setting the following environment variables

• $K_{p}$ spark.streaming.backpressure.pid.proportional (defaults to 1, non-negative) – weight for the contribution of the difference between the current rate with the rate of the last batch to the overall control signal.
• $K_{i}$ spark.streaming.backpressure.pid.integral (defaults to 0.2, non-negative) – weight for the contribution of the accumulation of the proportional error to the overall control signal.
• $K_{d}$ spark.streaming.backpressure.pid.derived (defaults to zero, non-negative) – weight for the contribution of the change of the proportional error to the overall control signal.

The default values are typically quite good. You can set an additional parameter not present in classical PID controllers: spark.streaming.backpressure.pid.minRate – the default value is 100 (must be positive). This is definitely a variable to watch out for if you are using back-pressure and have up front knowledge that you are expected to process messages at a rate much higher than 100; you can improve stability by minimising undershoot.

#### Constant Throughput Simulator

Consider the case where all else is held equal and a job will execute at a constant rate, with batches scheduled at a constant frequency. The size of the batch is allowed to vary according to back pressure, but the rate is constant. These assumptions negate the proportional amortization of fixed overhead in larger batches (economy of scale), and any extrinsic fluctuation in rate. The goal of the algorithm is to find, in a reasonable number of iterations, a stable fixed point for the batch size, close to the constant rate implied by the frequency. Depending on your batch interval, this may be a long time in real terms. The test is useful in two ways

1. Evaluating the entire parameter space of the algorithm: finding toxic combinations of parameters and initial throughputs which lead to divergence, oscillation or sluggish convergence.
2. Optimising parameters for a use case: for a given expected throughput rate, rapidly simulate the job under backpressure to optimise the choice of parameters.

Running a simulator is faster than running Spark jobs with production data repeatedly so can be useful for tuning the parameters. The suggested settings need to be validated afterwards by testing on an actual cluster.

1. Fix a batch interval and a target throughput
2. Choose a range possible values for $K_{p}$$K_{i}$$K_{d}$ between zero and two
3. Choose a range of initial batch sizes (as in spark.streaming.receiver.maxRate), above and below the size implied by the target throughput and frequency
4. Choose some minimum batch sizes to investigate the effect on damping.
5. Run a simulation for each element of the cartesian product of these parameters.
6. Verify the simulation by running a Spark streaming job using the recommended parameters.

From 4000 test cases, I found over 500 conditions where the algorithm oscillated violently out of control, despite maintaining constant throughput, typically when the integral and derivative components are large.

In a real Spark Streaming job this would result in an OOM. When I get time I will try to find a mathematical justification for this to verify this isn’t a bug in the simulator.

There are also interesting cases where the algorithm is stable and convergent. The first is catching up from a conservative estimate of throughput – which seems across a range of parameters not to overshoot.

When the algorithm needs to slow down it will always undershoot before converging. This cost will be amortized as will the latency incurred. However, if throughput is preferred it is better to overestimate the initial batch; if latency is preferred it is better to underestimate.

#### Simulator Code


@RunWith(Parameterized.class)
public class TestPIDController {

@Parameterized.Parameters(name = "Aim for {5}/s starting from initial batch={6} with interval {0}s, p={1}, i={2}, d={3}")
public static Object[][] generatedParams() {

double requiredThroughput = 5000;
long maxlatency = 1L;

double[] proportionals = new double[10];
double[] integrals = new double[10];
double[] derivatives = new double[10];
double v = 0.0;
for(int j = 0; j < proportionals.length; ++j) {
proportionals[j] = v;
integrals[j] = v;
derivatives[j] = v;
v += 2D/proportionals.length;
}

double[] initialBatchSizes = new double[] { 2500, 4500, 5500, 7500 };
double[] minBatchSizes = new double[] { 100, 500, 1000, 2500, 4500};
int numTestCases = proportionals.length * integrals.length * derivatives.length * initialBatchSizes.length;
Object[][] cases = new Object[numTestCases][];
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {
cases[caseNum] = new Object[7];
cases[caseNum][0] = maxlatency;
cases[caseNum][5] = requiredThroughput;
}
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {       cases[caseNum][1] = proportionals[caseNum % proportionals.length];     }     Arrays.sort(cases, (a, b) -> (int)(20 *(double)a[1] - 20 * (double)b[1]));
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {       cases[caseNum][2] = integrals[caseNum % integrals.length];     }     Arrays.sort(cases, (a, b) -> (int)(20 * (double)a[2] - 20 * (double)b[2]));
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {       cases[caseNum][3] = derivatives[caseNum % derivatives.length];     }     Arrays.sort(cases, (a, b) -> (int)(20 * (double)a[3] - 20 * (double)b[3]));
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {       cases[caseNum][4] = minBatchSizes[caseNum % minBatchSizes.length];     }     Arrays.sort(cases, (a, b) -> (int)((double)a[4] - (double)b[4]));
for(int caseNum = 0; caseNum < numTestCases; ++caseNum) {       cases[caseNum][6] = initialBatchSizes[caseNum % initialBatchSizes.length];     }     Arrays.sort(cases, (a, b) -> (int)((double)a[6] - (double)b[6]));
return cases;
}

public TestPIDController(long batchSizeSeconds,
double proportional,
double summation,
double derivative,
double minRate,
double constantProcessingRate,
double initialBatchSize) {
this.expectedBatchDurationSeconds = batchSizeSeconds;
this.proportional = proportional;
this.summation = summation;
this.derivative = derivative;
this.minRate = minRate;
this.constantProcessingRatePerSecond = constantProcessingRate;
this.initialBatchSize = initialBatchSize;
}

private final long expectedBatchDurationSeconds;
private final double proportional;
private final double summation;
private final double derivative;
private final double minRate;
private final double constantProcessingRatePerSecond;
private final double initialBatchSize;

@Test
public void ensureRapidConvergence() {
System.out.println("Time,Scheduling Delay,Processing Delay,Throughput,Batch Size");
long schedulingDelayMillis = 0;
double batchSize = initialBatchSize;
long expectedBatchDurationMillis = 1000 * expectedBatchDurationSeconds;
double batchTimeSeconds;
long batchTimeMillis;
long timeMillis = 0;
PIDRateEstimator estimator = new PIDRateEstimator(expectedBatchDurationMillis, proportional, summation, derivative, minRate);
Option<Object> newSize;
double numProcessed = 0;
double throughput = Double.NaN;

for(int i = 0; i < 100; ++i) {       // sanity check
if(timeMillis > 200 * expectedBatchDurationMillis)
Assert.fail();
numProcessed += batchSize;
batchTimeSeconds = getTimeToCompleteSeconds(batchSize);
batchTimeMillis = (long)Math.ceil(batchTimeSeconds * 1000);
long pauseTimeMillis = schedulingDelayMillis == 0 &amp;&amp; batchTimeSeconds <= expectedBatchDurationSeconds ? expectedBatchDurationMillis - batchTimeMillis : 0;
timeMillis += batchTimeMillis + pauseTimeMillis;
newSize = estimator.compute(timeMillis, (long)batchSize, batchTimeMillis, schedulingDelayMillis);
if(newSize.isDefined()) {
batchSize = (double)newSize.get();
}
long processingDelay = batchTimeMillis - expectedBatchDurationMillis;
schedulingDelayMillis += processingDelay;
if(schedulingDelayMillis < 0) schedulingDelayMillis = 0;
throughput = numProcessed/timeMillis*1000;
System.out.println(String.format("%d,%d,%d,%f,%d", timeMillis,schedulingDelayMillis, processingDelay, throughput,(long)batchSize));
}

double percentageError = 100 * Math.abs((constantProcessingRatePerSecond - throughput) /
constantProcessingRatePerSecond);
Assert.assertTrue(String.format("Effective rate %f more than %f away from target throughput %f",
throughput, percentageError, constantProcessingRatePerSecond), percentageError < 10);
}

private double getTimeToCompleteSeconds(double batchSize) {
return batchSize / (constantProcessingRatePerSecond);
}
}


#### Response to Instantaneous and Sustained Shocks

There is an important feature of a control algorithm I haven’t simulated yet – how does the algorithm respond to random extrinsic shocks, both instantaneous and sustained. An instantaneous shock should not move the process variable away from its fixed point for very long. Under a sustained shock, the algorithm should move the process variable to a new stable fixed point.

# Concise Binary Object Representation

Concise Binary Object Representation (CBOR) defined by RFC 7049 is a binary, typed, self describing serialisation format. In contrast with JSON, it is binary and distinguishes between different sizes of primitive type properly. In contrast with Avro and Protobuf, it is self describing and can be used without a schema. It goes without saying for all binary formats: in cases where data is overwhelmingly numeric, both parsing time and storage size are far superior to JSON. For textual data, payloads are also typically smaller with CBOR.

#### The Type Byte

The first byte of every value denotes a type. The most significant three bits denote the major type (for instance byte array, unsigned integer). The last five bits of the first byte denote a minor type (float32, int64 and so on.) This is useful for type inference and validation. For instance, if you wanted to save a BLOB into HBase and map that BLOB to a spark SQL Row, you can map the first byte of each field value to a Spark DataType. If you adopt a schema on read approach, you can validate the supplied schema against the type encoding in the CBOR encoded blobs. The major types and some interesting minor types are enumerated below but see the definitions for more information.

• 0:  unsigned integers
• 1:  negative integers
• 2:  byte strings, terminated by 7_31
• 3:  UTF-8 text, terminated by 7_31
• 4:  arrays, terminated by 7_31
• 5:  maps, terminated by 7_31
• 6:  tags, (0: timestamp strings, 1: unix epoch longs, 2: big integers…)
• 7:  floating-point numbers, simple ubiquitous values (20: False, 21: True, 22: Null, 23: Undefined, 26: float, 27: double, 31: stop byte for indefinite length fields (maps, arrays etc.))

#### Usage

In Java, CBOR is supported by Jackson and can be used as if it is JSON. It is available in


<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>2.8.4</version>
</dependency>


Wherever you would use an ObjectMapper to work with JSON, just use an ObjectMapper with a CBORFactory instead of the default JSONFactory.


ObjectMapper mapper = new ObjectMapper(new CBORFactory());


Jackson integrates CBOR into JAX-RS seamlessly via


<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-cbor-provider</artifactId>
<version>2.8.4</version>
</dependency>


If a JacksonCBORProvider is registered in a Jersey ResourceConfig (a one-liner), then any resource method annotated as @Produces("application/cbor"), or any HTTP request with the Accept header set to “application/cbor” will automatically serialise the response as CBOR.

Jackson deviates from the specification slightly by promoting floats to doubles (despite parsing floats properly it post-processes them as doubles), Jackson recognises floats properly as of 2.8.6 and distinguishes between longs and ints correctly so long as CBORGenerator.Feature.WRITE_MINIMAL_INTS is disabled on the writer.

In javascript, cbor.js can be used to deserialise CBOR, though loss of browser native support for parsing is a concern. It would be interesting to see some benchmarks for typical workloads to evaluate the balance of the cost of javascript parsing versus the benefits of reduced server side cost of generation and reduced message size. Again, for large quantities of numeric data this is more likely to be worthwhile than with text.

#### Comparison with JSON – Message Size

Textual data is slightly smaller when represented as CBOR as opposed to JSON. Given the interoperability that comes with JSON, it is unlikely to be worth using CBOR over JSON for reduced message size.

Large arrays of doubles are a lot smaller in CBOR. Interestingly, large arrays of small integers may actually be smaller as text than as binary; it takes only two bytes to represent 10 as text, whereas it takes four bytes in binary. Outside of the range of -99 to 999 this is no longer true, but might be a worthwhile economy for large quantities of survey results.

JSON and CBOR message sizes for messages containing mostly textual, mostly integral and mostly floating point data are benchmarked for message size at github. The output is as follows:

CBOR, Integers: size=15122B
JSON, Integers: size=6132B
CBOR, Doubles: size=27122B
JSON, Doubles: size=54621B
CBOR, Text: size=88229B
JSON, Text: size=116565B


#### Comparison with JSON – Read/Write Performance

Using Jackson to benchmark the size of the messages is not really a concern since it implements each specification; the output and therefore size should have been the same no matter which library produced the messages. Measuring read/write performance of a specification is difficult because only the implementation can be measured. It may well be the case that either JSON or CBOR can be read and written faster by another implementation than Jackson (though I expect Jackson is probably the fastest for either format). In any case, measuring Jackson CBOR against Jackson JSON seems fair. I benchmarked JSON vs CBOR writes using the Jackson implementations of each format and JMH. The code for the benchmark is at github

The results are as below. CBOR has significantly higher throughput for both read and write.

Benchmark Mode Count Score Error Units
readDoubleDataCBOR thrpt 5 12.230 ±1.490 ops/ms
readDoubleDataJSON thrpt 5 0.913 ±0.046 ops/ms
readIntDataCBOR thrpt 5 16.033 ±3.185 ops/ms
readIntDataJSON thrpt 5 8.400 ±1.219 ops/ms
readTextDataCBOR thrpt 5 15.736 ±3.729 ops/ms
readTextDataJSON thrpt 5 1.065 ±0.026 ops/ms
writeDoubleDataCBOR thrpt 5 26.222 ±0.779 ops/ms
writeDoubleDataJSON thrpt 5 0.930 ±0.022 ops/ms
writeIntDataCBOR thrpt 5 31.095 ±2.116 ops/ms
writeIntDataJSON thrpt 5 33.512 ±9.088 ops/ms
writeTextDataCBOR thrpt 5 31.338 ±4.519 ops/ms
writeTextDataJSON thrpt 5 1.509 ±0.245 ops/ms
readDoubleDataCBOR avgt 5 0.078 ±0.003 ms/op
readDoubleDataJSON avgt 5 1.123 ±0.108 ms/op
readIntDataCBOR avgt 5 0.062 ±0.008 ms/op
readIntDataJSON avgt 5 0.113 ±0.012 ms/op
readTextDataCBOR avgt 5 0.058 ±0.007 ms/op
readTextDataJSON avgt 5 0.913 ±0.240 ms/op
writeDoubleDataCBOR avgt 5 0.038 ±0.004 ms/op
writeDoubleDataJSON avgt 5 1.100 ±0.059 ms/op
writeIntDataCBOR avgt 5 0.031 ±0.002 ms/op
writeIntDataJSON avgt 5 0.029 ±0.004 ms/op
writeTextDataCBOR avgt 5 0.032 ±0.003 ms/op
writeTextDataJSON avgt 5 0.676 ±0.044 ms/op

The varying performance characteristics of media types/serialisation formats based on the predominant data type in a message make proper HTTP content negotiation important. It cannot be known in advance when writing a server application what the best content type is, and it should be left open to the client to decide.

Kerberos is the only real option for securing an Hadoop cluster. When deploying custom services into a cluster with Kerberos enabled, authentication can quickly become a cross-cutting concern.

#### Kerberos Basics

First, a brief introduction to basic Kerberos mechanisms. In each realm there is a Key Distribution Centre (KDC) which issues different types of tickets. A KDC has two services: the Authentication Service (AS) and the Ticket Granting Service (TGS). There are two ticket types issued: Ticket Granting Tickets (TGT) and Service Tickets. Every KDC has a special user called krbtgt and a service key derived from the password for the krbtgt account; a TGT is actually just a service ticket for the krbtgt account, encrypted with the krbtgt service key. The KDC has all the symmetric keys for all services and users in its realm.

The end goal of a user requesting a Kerberised service is to be able to present a service ticket obtained from the Ticket Granting Service to the Kerberised service in order to authenticate itself, but it needs to get a TGT and a TGS session key from the Authentication Service first. The sequence of requests and responses is as follows:

1. REQ_AS: The Client requests an initial TGT from the KDC’s Authentication Service by passing its user key (the user key comes from a keytab file or a username and password). The presented key is checked against the client’s symmetric key (the KDC has this encrypted with its own service key).
2. REP_AS: The Authentication Service issues a TGT which contains a TGS session key. The TGT has a lifetime and a renewable lifetime. Within the lifetime, the TGT can be cached and REQ_AS does not need to be made again: TGT lookup does not need to happen on each service request. The client creates an authenticator. The details of authenticator construction are too complicated to outline here. If a TGT is renewable, then only the TGS session key (not the TGT, which is large) need be refreshed periodically, and for each renewal the lifetime is reset.
3. REQ_TGS: Now the client has a TGS session key, it can request a service ticket from the TGS. The client must know the service name, and have a TGS session key and an authenticator. If no TGS session key is found, REQ_AS must be reissued. REQ_TGS must be performed for each service (if you need to access Kafka as well as HBase, you would need to do REQ_TGS twice, once for Kafka and once for HBase, though your TGT and TGS session key are good for both).
4. REP_TGS: The TGS has a local copy of the TGT associated with the TGS session key, which it checks against the authenticator and issues a service ticket. The service ticket is encrypted with the requested service’s symmetric key. Finally the user has a service ticket.
5. REQ_APP: The client sends the service ticket to the service. The service decrypts the service ticket (it is encrypted by the TGS with the service’s symmetric key.)
6. REP_APP (optional): The client can request mutual authentication, in which case the service will respond with another ticket.

#### UserGroupInformation API

Kerberos is quite simple in Java if you have access to JAAS. Some of the newer Hadoop ecosystem projects do use it (e.g. Kafka, Solr) but if you are using HBase or HDFS you need to use UserGroupInformation. The only part of the Kerberos mechanism pertinent for most use cases is TGT acquisition; UserGroupInformation will handle the rest.

To get a TGT, you need a principal name and a keytab so UserGroupInformation can issue REQ_AS.




If your keytab is good, this will give you a TGT and a TGS session key. HBase and HDFS components will get the created UserGroupInformation from the static method UserGroupInformation.getLoginUser(). In HADOOP-6656 a background task was added to perform TGS session key renewal. This will keep you logged in until the renewable lifetime is exhausted, so long as renewable tickets are enabled in your KDC. When the renewable lifetime is exhausted, your application will not be able to authenticate.

To get around that, you can use UserGroupInformation to perform REQ_AS on a scheduled basis. This grants perpetuity.




This can be done by a ScheduledExecutorService and wrapped up into a simple facade allowing you to login, logout, and execute actions as the logged in user, for as long as your service is up.


public class KerberosFacade implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(KerberosFacade.class);

private final ScheduledExecutorService refresher;
private final String keytab;
private final String user;
private final int requestTGTFrequencyHours;
private volatile ScheduledFuture<?> renewal;
private final AuthenticationFailureListener failureListener;

String keytab,
String user,
this.failureListener = wrap(failureListener);
this.keytab = keytab;
this.user = user;
}

public void login() throws IOException {
this.renewal = refresher.scheduleWithFixedDelay(() -> {
try {
} catch (Exception e) {
onFailure(e);
}
}, requestTGTFrequencyHours, requestTGTFrequencyHours, TimeUnit.HOURS);
}

public void logout() {
stopRefreshing();
}

public <T> T doAs(PrivilegedAction<T> action) {
try {
return UserGroupInformation.getCurrentUser()
.doAs(action);
} catch (IOException e) {
onFailure(e);
return null;
}
}

public <T> T doAs(PrivilegedExceptionAction<T> action) throws PrivilegedActionException {
try {
return UserGroupInformation.getCurrentUser()
.doAs(action);
} catch (InterruptedException | IOException e) {
onFailure(e);
return null;
}
}

@Override
public void close() throws IOException {
logout();
refresher.shutdownNow();
}

private void stopRefreshing() {
if (null != this.renewal) {
this.renewal.cancel(true);
}
}

protected void onFailure(Exception e) {
failureListener.handle(this, e);
}

private static AuthenticationFailureListener wrap(AuthenticationFailureListener listener) {
return (f, e) -> {
LOGGER.error("Authentication Failure for " + f.user, e);
if(null != listener) {
listener.handle(f, e);
}
};
}
}


# Co-locating Spark Partitions with HBase Regions

HBase scans can be accelerated if they start and stop on a single region server. IO costs can be reduced further if the scan is executed on the same machine as the region server. This article is about extending the Spark RDD abstraction to load an RDD from an HBase table so each partition is co-located with a region server. This pattern could be adopted to read data into Spark from other sharded data stores, whenever there is a metadata protocol available to dictate partitioning.

The strategy involves creating a custom implementation of the Spark class RDD, which understands how to create partitions from metadata about HBase regions. To read data from HBase, we want to execute a scan on a single region server, and we want to execute on the same machine as the region to minimise IO. Therefore we need the start key, stop key, and hostname for each region associated with each Spark partition.


public class HBasePartition implements Partition {

private final String regionHostname;
private final int partitionIndex;
private final byte[] start;
private final byte[] stop;

public HBasePartition(String regionHostname, int partitionIndex, byte[] start, byte[] stop) {
this.regionHostname = regionHostname;
this.partitionIndex = partitionIndex;
this.start = start;
this.stop = stop;
}

public String getRegionHostname() {
return regionHostname;
}

public byte[] getStart() {
return start;
}

public byte[] getStop() {
return stop;
}

@Override
public int index() {
return partitionIndex;
}
}


The HBase interface RegionLocator, which can be obtained from a Connection instance, can be used to build an array of HBasePartitions. It aids efficiency to check if it is possible to skip each region entirely, if the supplied start and stop keys do not overlap with its extent.


public class HBasePartitioner implements Serializable {

public Partition[] getPartitions(byte[] table, byte[] start, byte[] stop) {
try(RegionLocator regionLocator = ConnectionFactory.createConnection().getRegionLocator(TableName.valueOf(table))) {
List<HRegionLocation> regionLocations = regionLocator.getAllRegionLocations();
int regionCount = regionLocations.size();
List<Partition> partitions = Lists.newArrayListWithExpectedSize(regionCount);
int partition = 0;
for(HRegionLocation regionLocation : regionLocations) {
HRegionInfo regionInfo = regionLocation.getRegionInfo();
byte[] regionStart = regionInfo.getStartKey();
byte[] regionStop = regionInfo.getEndKey();
if(!skipRegion(start, stop, regionStart, regionStop)) {
partition++,
max(start, regionStart),
min(stop, regionStop)));
}
}
return partitions.toArray(new Partition[partition]);
}
catch (IOException e) {
throw new RuntimeException("Could not create HBase region partitions", e);
}
}

private static boolean skipRegion(byte[] scanStart, byte[] scanStop, byte[] regionStart, byte[] regionStop) {
// check scan starts before region stops, and that the scan stops before the region starts
return min(scanStart, regionStop) == regionStop || max(scanStop, regionStart) == regionStart;
}

private static byte[] min(byte[] left, byte[] right) {
if(left.length == 0) {
return left;
}
if(right.length == 0) {
return right;
}
return Bytes.compareTo(left, right) < 0 ? left : right;   }   private static byte[] max(byte[] left, byte[] right) {     if(left.length == 0) {       return right;     }     if(right.length == 0) {       return left;     }     return Bytes.compareTo(left, right) >= 0 ? left : right;
}
}


Finally, we can implement an RDD specialised for executing HBasePartitions. We want to exploit the ability to choose or influence where the partition is executed, so need access to a Scala RDD method getPreferredLocations. This method is not available on JavaRDD, so we are forced to do some Scala conversions. The Scala/Java conversion work is quite tedious but necessary when accessing low level features on a Java-only project.


public class HBaseRDD<T> extends RDD<T> {

private static <T> ClassTag<T> createClassTag(Class>T> klass) {
return scala.reflect.ClassTag$.MODULE$.apply(klass);
}

private final HBasePartitioner partitioner;
private final String tableName;
private final byte[] startKey;
private final byte[] stopKey;
private final Function<Result, T> mapper;

public HBaseRDD(SparkContext sparkContext,
Class<T> klass,
HBasePartitioner partitioner,
String tableName,
byte[] startKey,
byte[] stopKey,
Function<Result, T> mapper) {
super(new EmptyRDD<>(sparkContext, createClassTag(klass)), createClassTag(klass));
this.partitioner = partitioner;
this.tableName = tableName;
this.startKey = startKey;
this.stopKey = stopKey;
this.mapper = mapper;
}

@Override
public Iterator<T> compute(Partition split, TaskContext context) {
HBasePartition partition = (HBasePartition)split;
try(Connection connection = ConnectionFactory.createConnection()) {
Scan scan = new Scan()
.setStartRow(partition.getStart())
.setStopRow(partition.getStop())
.setCacheBlocks(false);
Table table = connection.getTable(TableName.valueOf(tableName));
ResultScanner scanner = table.getScanner(scan);
return JavaConversions.asScalaIterator(
StreamSupport.stream(scanner.spliterator(), false).map(mapper).iterator()
);
}
catch (IOException e) {
throw new RuntimeException("Region scan failed", e);
}
}

@Override
public Seq<String> getPreferredLocations(Partition split) {
Set<String> locations = ImmutableSet.of(((HBasePartition)split).getRegionHostname());
return JavaConversions.asScalaSet(locations).toSeq();
}

@Override
public Partition[] getPartitions() {
return partitioner.getPartitions(Bytes.toBytes(tableName), startKey, stopKey);
}
}


As far as the interface of this class is concerned, it’s just normal Java, so it can be used from a more Java-centric Spark project, despite using some Scala APIs under the hood. We could achieve similar results with mapPartitions, but would have less control over partitioning and co-location.

# HBase Connection Management

I have built several web applications recently using Apache HBase as a backend data store. This article addresses some of the design concerns and approaches made in efficiently managing HBase connections.

One of the first things I noticed about the HBase client API was how long it takes to create the connection. HBase connection creation is effectively Zookeeper based service discovery. Once the connection is created, the end client will know where all the region servers are, and which region server is serving which key space. All of this takes time, so it’s advisable not to connect too often.

At first I only created the connection once, when I started the web application. This is very simple and is fine for most use cases.


public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(configuration);
}


This approach is great unless there is the requirement to proxy your end user when querying HBase. If Apache Ranger is enabled on your HBase cluster, proxying your users allows it to apply user specific authorisation to the query, rather than to your web application service user. This poses a few constraints: the most relevant being that you need to create a connection per user so you can’t just connect when you start your application any more.

#### Proxy Users

I needed to proxy users and minimise connection creation, so I built a connection pool class which, given a user principal, creates a connection as the user. I used Guava’s loading cache to handle cache eviction and concurrency. Guava’s cache also has a very useful eviction listener, which allows the connection to be closed when evicted from the cache.

In order to get the user proxying working, the UserGroupInformation for the web application service principal itself is required (see here), and you need to have successfully authenticated your user (I used SPNego to do this). The Hadoop class UserProvider is then used to create a proxy user. Your web application service principal also needs to be configured as a proxying user in core-site.xml, which you can manage via tools like Ambari.


public class ConnectionPool implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPool.class);
private final Configuration configuration;
private final UserProvider userProvider;
private volatile boolean closed = false;

public ConnectionPool(Configuration configuration, UserGroupInformation loginUser) {
this.configuration = configuration;
this.userProvider = UserProvider.instantiate(configuration);
this.cache = createCache();
}

public Connection getConnection(Principal principal) throws IOException {
return cache.getUnchecked(principal.getName());
}

@Override
public void close() throws IOException {
if(!closed) {
closed = true;
cache.invalidateAll();
cache.cleanUp();
}
}

private Connection createConnection(String userName) throws IOException {
}

return CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.<String, Connection>removalListener(eviction -> {
Connection connection = eviction.getValue();
if(null != connection) {
try {
connection.close();
} catch (IOException e) {
LOGGER.error("Connection could not be closed for user=" + eviction.getKey(), e);
}
}
})
.maximumSize(100)