Exploring Dynamic Loading of Custom Filters in HBase

Any program that pulls data from a large HBase table containing terabytes of data spread over many nodes will need to put a bit of thought into the retrieval of this data. Failure to do this may mean waiting for and subsequently processing a lot of unnecessary data, to the point where it renders this program (whether a single-threaded client or a MapReduce job) useless. HBase’s Scan API helps in this aspect. It configures the parameters of the data retrieval, including the columns to include, start and stop rows and batch sizing.
 
The Scan can also include a filter which can be the most impactful
way to improve performance of scans of an HBase table. This filter is applied to a table and screens out unwanted rows from being included in a result set. A well-designed filter is performant and minimizes the data scanned and returned to the client. There are many useful Filters that come standard with Hbase, but sometimes the best solution is to use a custom Filter tuned to your HTable's schema.

Before your custom filter can be used, it will have to compiled, packaged in a jar, and deployed to all the regionservers. Restarting the HBase cluster is necessary for the regionservers to pick up the code in their classpaths. Therein lies the problem – an HBase restart takes a non-trivial amount of time (although rolling restarts mitigate that somewhat) and the downtime is significant with a cluster as big as Flurry's.

This is where dynamic filters come in. The word 'dynamic' refers to the on-demand loading of these custom filters, just like loading external modules at runtime in a typical application server or web server. In this post, we will explore an approach that makes this possible in the cluster.

How It Works Now
Before we dive into the workings of dynamically loading filters, let's see how regular custom filters work.

Assuming the custom filter has already been deployed to a jar in the regionservers’
classpath, the client can simply use the filter, e.g. in a full table scan, like this

This filter will have to be pushed to the regionservers to be run server-side. The sequence of how the custom filter gets replicated on the regionservers is as follows:

  1. The client serializes the Filter class name and its state into a byte stream by calling the CustomFilter's write(DataOutput) method.
  2. The client directs the byte array to the regionservers that will be part of the scan.
  3. Server-side, the RegionScanner re-creates the Scan, including the filter, using the byte stream. Part of the stream is the filter’s class name, and the default classloader uses this fully qualified name to load the class using Class.forName().
  4. The filter is then instantiated using its empty constructor and configured using the rest of the filter byte array (using the filter's readFields(DataInput) method (see org.apache.hadoop.hbase.client.Scan for details).
  5. The filter is then applied as a predicate on each row.

(1) myfilter.jar containing our custom filter resides locally in the regionservers' classpath
(2) the custom filter is instantiated using the default Classloader

Once deployed, this definition of our custom filter is static. We can make an ad hoc query using the combination of filters, but if we need to add, extend or replace a custom filter, it has to be added to the regionserver’s classpath and we have to wait for its next restart before those filters can be used.

There is a faster way.

Dynamic Loading
A key takeaway from the previous section is that Filters are Writables – they are instantiated using the name of the class and then configured by a stream of bytes that the Filter understands. This makes the filter configuration highly customizable and we can use this flexibility to our advantage.

Rather than create a regular Filter, we introduce a ProxyFilter which acts as the extension point through which we can load our custom Filters on demand. During runtime, it will load the custom class filter itself.

Let’s look at some example code. To start with, there is just a small change we have to make on the client; the ProxyFilter now wraps the Filter or FilterList we want to use in our scan.

The ProxyFilter passes its own class name to be instantiated on the server side, and serializes the custom filter after.

On the regionserver the ProxyFilter is initialized in the same way as described in the previous section. The byte stream that follows should minimally contain the filter name and its configuration byte array. In the ProxyFilter's readFields method, the relevant code looks like this.

This is very much like how the default Scan re-creates the Filter on the regionserver with one critical difference – it uses a filterModule object to obtain the Class definition of the custom filter. This module retrieves the custom filter Class and returns it to ProxyFilter for instantiation and configuration.

There can be different strategies for retrieving the custom filter class. Our example code copies the jar file from the Hadoop filesystem to a local directory and delegates the loading of the Filter classes from this jar to a custom classloader [3].

To configure the location of the directory the module searches for the filters.jar in HDFS, add the following property in hbase-site.xml.

<property>
<name>flurry.hdfs.filter.jar</name>
<value>/flurry/filters.jar</value>
</property>
(1) The custom filter jar resides in a predefined directory in HDFS

(2) The proxyfilter.jar containing the extension point needs to reside locally in the regionserver's classpath
(3) The ProxyFilter is instantiated using the default ClassLoader
(4) If necessary, the rowfilter.jar is downloaded from a preset Path in HDFS. A custom classloader in ProxyFilter proceeds to instantiate the correct filter. Filter interface calls are then delegated to the enclosed filter.

With the ProxyFilter in place, it is now simply a matter of placing or replacing the jar in the Hadoop FS directory to pick up the latest filters.

Reloading Filters
When a new Scan is requested on the server side, this module first checks up on the filter.jar. If this jar is unchanged, the previously loaded Classes are returned. However, if the jar has been updated, the module repeats the process of downloading it from HDFS, creating a new instance of the classloader and reloading the classes from this modified jar. The previous classloader is dereferenced and left to be garbage collected. Restarting the HBase cluster is not required.

The HdfsJarModule keeps track of the latest custom filter definitions using a separate classloader for the different jar versions

Custom classloading and reloading can be a class-linking, ClassCastException minefield, but the risk here is mitigated by the highly specialized use case of Filtering. The filter is instantiated and configured per scan and its object lifecycle limited to the time it takes to do the scan in the regionserver. The example uses the child-first classloader mentioned in a previous post on ClassLoaders that searches for a configured set of URLs before delegating to its parent classloader [2].

Things to watch out for

  • The example code has a performance overhead as it makes additional calls to HDFS to check for the modification time of the filter jar when a filter is first instantiated. This may be a significant factor for smaller scans. If so, the logic can be changed to check the jar less frequently.
  • The code is also very naïve at this point. Updating the filter.jar in the Hadoop FS while a table scan is happening can have undesired results if the updated filters are not backward compatible. Different versions of the jar can be picked up by the RegionServers for the same scan as they check and instantiate the Filters at different times.
  • Mutable static variables are discouraged in the custom Filter because they will be reinitialized when the class is reloaded dynamically.

Extensions
The example code is just a starting point for more interesting functionality tailored to different use cases. Scans using filters can also be used in MapReduce jobs and coprocessers. A short list of possible ways to extend the code:

  • The most obvious weakness in the example implementation is the ProxyFilter only supports one jar. Extending that to include all jars in a filter directory will be a good start. [4]
  • Different clients may expect certain versions of Filters. Some versioning and bookkeeping logic will be necessary to ensure that the ProxyFilter can serve up the correct filter to each client.
  • Generalize the solution to include MapReduce scenarios that use HBase as the input source. The module can load the custom filters at the start of each map task from the MR job library instead, unloading the filters after the task ends.
  • Support other JVM languages for filtering. We have tried serializing Groovy 1.6 scripts as Filters but performance was several times slower.


Using the Proxyfilter as a generic extension point for custom filters allows us to improve our performance without the hit of restarting our entire HBase cluster.

Footnotes
[1] Class Reloading Basics http://tutorials.jenkov.com/java-reflection/dynamic-class-loading-reloading.html
[2] See our blog post on ClassLoaders for alternate classloader delegation http://tech.flurry.com/using-the-java-classloader-to-write-efficient
[3] The classloader in our example resembles the one described in this ticket https://issues.apache.org/jira/browse/HBASE-1936
[4] A new classloader has been just been introduced in hbase-0.92.2 for coprocessors, and it seems to fit perfectly for our dynamic filters https://issues.apache.org/jira/browse/HBASE-6308
[5] Example code https://github.com/vallancelee/hbase-filter/tree/master/customFilters

1 response
First off, let me say that the overall idea is great, and this is a really lovely hack. I like it. Also a quick shout out to Ian for pointing this out to me.

The use of HDFS for pushing jars around seems a bit roundabout. Have you considered pushing the jars directly to the region servers as serialized blobs? It's possible to create a classloader that reads from any inputstream, so you could just push the filtering code to region servers as a blob of bytecode and then have them instantiate the filtering class directly out of memory (This is what Java RMI does, iirc).

It also looks like HBase has a few standardized composable components for constructing filters (CompareFilter, FilterList). Is there a reason that these aren't sufficient for your needs? (I'm legitimately curious, there might be some interesting research questions here)

-Oliver Kennedy