In my previous post, I covered a whole bunch of shortcomings and pain points in HBase, and concluded that nothing is magic: it takes careful coding and tweaking to achieve good results, in terms of both performance and stability. Unfortunately, it’s often very hard to avoid some pitfalls until you actually find yourself in one, so here’s my take on a few best practices for HBase. Hopefully, these reach you in time – before you start pulling out your hair in frustration!
— Dynamic Yield (@DynamicYield) May 18, 2015
1. Know your key-hashing strategy well
The critical issue of distributing your row keys well to avoid “hot” regions has been extensively covered already, both in the official documentation and around the internet (here’s one good example, and another one). I strongly advise you to get acquainted with this topic if you haven’t already.
As an example of not really well distributed tables, let’s assume you need to store per-user data. In this case, the row key would simply be the user’s ID, which will probably be a monotonically increasing integer (i.e. generated using a sequence, using MySQL or other tools). It’s easy to see that having the user ID as the key would make all writes for new users go into the last region of your table, which handles the highest values. Additionally, if new users tend to be significantly more active than older ones, or vice-versa, then any updates to existing rows won’t be well distributed across regions as well. Substitute the term ‘User ID’ with ‘Ticket ID’, ‘Product ID’ or any other entity type where there’s a much higher than average write-rate for a small portion of IDs, and you might discover this issue in your own use-cases. The standard YCSB test for big-data tools actually covers not only uniform distribution of keys but also zipfian distribution, or a bias towards the latest data.
However, if we examine that monotonic ID’s structure more closely, we’ll see that it does contain an element that cycles nicely and evenly with each new ID allocated: its least-significant byte. However, once you have any significant number of users, the most significant bytes of generated IDs remain pretty constant for long periods of time while only the least significant bytes rotate (I’m assuming big-endian order, which is pretty much the standard for binary serialization; see for example Hadoop’s
Bytes class and Java’s
DataOutputStream). This is unfortunate, because good distribution of keys relies on their most significant byte(s). To better illustrate this, think of the odometer in your car, whether digital or old school: how often would you see any of the left-most digits rotate?
A fix for making these row keys distribute nicely is fairly easy to implement. You simply need to prefix the key with a leading byte based on the user ID, whose value is well distributed. In other words, you need a consistent hash. For any given ID, you should always get back the same value.
One way of achieving this is to define a fixed number of buckets, with the leading byte in the key being the bucket number. That byte is usually calculated as
userId % BUCKETS_NUMBER. This in effect relies on the well distributed nature of the lowest byte, so alternatively you could just grab the whole least significant byte of the ID as the prefix.
If you have a table for collecting a user’s raw events for later per-user aggregation, then having a consistent hash also has another advantage. It guarantees that a user’s data will always reside under the same prefix, so you can write concurrent code that processes each prefix (i.e. each block of users) without the need for a later reduce phase between tasks. Of course, a MapReduce job could easily merge a user’s rows using the user ID as key, however at Dynamic Yield we’ve tried to steer clear from M/R when dealing with jobs that need to run frequently and quickly, given the high overhead of launching MapReduce. (We’re currently switching to Apache Spark for this kind of job). Whether you use M/R or custom parallel code, you probably want to ensure each task gets an equal share of work, which is another advantage of well distributed tables.
However, sometimes there’s no need for a consistent hash. Assume you have a table whose native key is simply the timestamp (for later scanning by time range). To avoid one hot region that handles all new writes, you could simply generate a random byte (with a value smaller than
BUCKETS_NUMBER)as the prefix when writing a new row. To then perform a partial scan for any given time range, you would need a separate scan for each prefix. Note that this multi-scan approach significantly differs from using the built-in
scan.setTimeRange() method to find all data with a given HBase-timestamp range (regardless of the row key). The latter requires the Region Server to perform intense analysis over much of the table’s data in order to filter out any data not in range. For large tables, this might mean a very slow scan.
2. Consider making your IDs HBase-friendly by design
Let’s revisit the case of plain vanilla monotonic user IDs. In addition to requiring a prefix, there are other unwanted side effects with this method of ID generation:
- There is a single point of failure at the point where IDs are created, which is the database that provides the sequence.
- Given two IDs, we can tell which ID is newer, but we have no idea when an ID was created, which host created it, or any other relevant metadata, unless we rely on an auxiliary table to hold this data. In other words, the ID in itself doesn’t hold much information.
Instead, you could adopt a solution similar to Twitter Snowflake (see a discussion here). Instead of a relying on a single point of failure, you could let each host independently generate 64-bit unique IDs. This ID is a concatenation of (a) its creation time, (b) a unique ID of the machine which generated it and (c) a leading cyclic counter for uniform distribution. Here’s a diagram comparing “the old way” vs. the new approach:
Using this solution means that any table whose key is a user ID does not need any special code, or a leading prefix byte in order to be well distributed. Instead, the ID has this attribute naturally. As extras, you also get the exact timestamp of creation and some additional space for any kind of metadata you might want (say, the user’s country code, or any app-specific flags, which might later help with routing your traffic). Lastly, this ID format is not bound by 32-bit int limits. It might sound unlikely now, but if you build any kind of a successful web analytics service, you will get to int limits well before your company becomes a unicorn.
Note that the cyclic counter should rotate regardless of the timestamp, so that you get truly uniform distribution regardless of how many IDs are actually generated per second. Uniqueness in this example is only guaranteed if you create up to 256 IDs per machine, per second, but you can modify any component in my suggested scheme to better fit your needs. Of course, you also need a method for allocating unique IDs for your machines. At Dynamic Yield, we allocate these as part of our Chef-based automation.
3. Use snapshots for large-scale reads, and bulk loading for large-scale writes
In my previous post, I linked to a presentation covering snapshots. If you’re interested in doing big MapReduce jobs using HBase, either for data processing or simply for backing up your data, you should consider using snapshots in your pipeline. With snapshots in the mix, the process for doing large-scale work over HBase without burdening your online region servers is now complete.
- Create a new snapshot for the table you need to work on. This can be done either via the HBase Shell, or programmatically via
- As an input, setup your mapper to read directly from a snapshot using
TableSnapshotInputFormat.initJob()which configures the input format for you.
- As an output format, instead of sending a large chunk of rows back to the Region Servers via the normal API, use the ready-made
FileOutputFormat.configureIncrementalLoad()helper for writing very large HFiles into HDFS.
- Once the job is complete, these HFiles can then be very efficiently loaded back into HBase in bulk from your driver code.
A note of caution: when you’re done using a snapshot, be sure to delete it! When a snapshot is taken it requires negligible storage space. The only metadata stored is about the HFiles that currently make up the table. From that moment on however, HBase will run its normal course of compactions over the table, creating new HFiles and making older files obsolete in the process. Normally, these obsolete files are then deleted from HDFS. However, in order to ensure that you can always read from the snapshot (i.e. read the table data as it was when the snapshot was taken), HBase needs to keep around all obsolete files that are still referenced by any snapshot.
With time, an old snapshot might hold onto a huge set of HFiles that is now completely obsolete – taking up as much storage space as the online table! Hence, deleting any unneeded snapshots is essential if you don’t want your storage use to skyrocket.
If you ever want to find out how far a snapshot has diverged from the current online table file set of, run:
hbase org.apache.hadoop.hbase.snapshot.SnapshotInfo -stats (see the documentation).
4. Consider in-memory aggregation for analytics
While HBase can sustain a pretty impressive write volume (assuming you’ve done your homework on row-key distribution and cache tuning), if you wish to use HBase for real-time analytics there are some big issues to consider:
- If each incoming raw event increments multiple counters (counters by country, by referrer, by platform, by custom user segment etc.), your cluster will need to sustain a serious “write amplification” issue: each incoming piece of data will affect many different atomic counters. Even if your cluster can sustain that for the time being, is this really what you want to spend your computing/network dollars on?
- Sure, atomic counters are great, but what if rather than just count page-views or hits, you want to have an up-to-date count of unique visitors calculated from the endless stream of atomic user actions? Simple counters are not going to solve that for you. Instead, you will need a probabilistic data structure such as HyperLogLog to count uniques. Since HBase does not have built-in support for HLLs, you need to first write each incoming atomic event into pre-aggregated tables, and then run frequent aggregations over the recent data to approximate the unique user counts per time-frame. This means a lot of reads and writes only to calculate the concise data points which you actually wanted to persist.
Instead you can use Apache Storm, Apache Spark Streaming (or Akka at a lower-level) to create an in-memory aggregation framework. This layer breaks down incoming requests into all necessary counter buckets (whether they’re simple counters or HyperLogLogs), and these counters can then be regularly persisted to HBase in high resolution.
…and One More Thing
At the time of publication, Google has just released their fully-managed Google Cloud BigTable offering in beta. Unfortunately it is only available when using Google’s own cloud platform, so it’s relevant mainly for customers currently planning a new system or already considering a switch anyway. Google promises very low latency in the 99th percentile, in contrast to HBase which is riddled by Java’s GC woes. I should note that MapR have also recently made their HBase-compatible MapR-DB freely available, and they make similar claims about the performance of their native solution. However, Google’s offering is after all the BigTable, and it is fully managed.