Friday, 13 July 2012

Getting started with DataCube on HBase

This tutorial blog provides a quick introduction to using DataCube, a Java based OLAP cube library with a pluggable storage engine open sourced by Urban Airship. In this tutorial, we make use of the inbuilt HBase storage engine.

In a small database much of this would be trivial using aggregating functions (SUM(), COUNT() etc). As the volume grows, one often precalculates these metrics which brings it's own set of consistency challenges. As one outgrows a database, as GBIF are, we need to look for new mechanisms to manage these metrics. The features of DataCube that make this attractive to us are:
  • A managable process to modify the cube structure
  • A higher level API to develop against
  • Ability to rebuild the cube with a single pass over the source data
For this tutorial we will consider the source data as classical DarwinCore occurrence records, where each record represents the metadata associated with a species observation event, e.g.:
ID, Kingdom, ScientificName, Country, IsoCountryCode, BasisOfRecord, CellId, Year
1, Animalia, Puma concolor, Peru, PE, Observation, 13245, 1967
2, Plantae, Abies alba, Spain, ES, Observation, 3637, 2010
3, Plantae, Abies alba, Spain, ES, Observation, 3638, 2010
Suppose the following metrics are required, each of which is termed a rollup in OLAP:
  1. Number of records per country
  2. Number of records per kingdom
  3. Number of records georeferenced / not georeferenced
  4. Number of records per kingdom per country
  5. Number of records georeferenced / not georeferenced per country
  6. Number of records georeferenced / not georeferenced per kingdom
  7. Number of records georeferenced / not georeferenced per kingdom per country
Given the requirements above, this can be translated into a cube definition with 3 dimensions, and 7 rollups as follows:
/**
 * The cube definition (package access only).
 * Dimensions are Country, Kingdom and Georeferenced with counts available for:
 * 
    *
  1. Country (e.g. number of record in DK)
  2. *
  3. Kingdom (e.g. number of animal records)
  4. *
  5. Georeferenced (e.g. number of records with coordinates)
  6. *
  7. Country and kingdom (e.g. number of plant records in the US)
  8. *
  9. Country and georeferenced (e.g. number of records with coordinates in the UK
  10. *
  11. Country and kingdom and georeferenced (e.g. number of bacteria records with coordinates in Spain)
  12. *
* TODO: write public utility exposing a simple API enabling validated read/write access to cube. */ class Cube { // no id substitution static final Dimension COUNTRY = new Dimension("dwc:country", new StringToBytesBucketer(), false, 2); // id substitution applies static final Dimension KINGDOM = new Dimension("dwc:kingdom", new StringToBytesBucketer(), true, 7); // no id substitution static final Dimension GEOREFERENCED = new Dimension("gbif:georeferenced", new BooleanBucketer(), false, 1); // Singleton instance if accessed through the instance() method static final DataCube INSTANCE = newInstance(); // Not for instantiation private Cube() { } /** * Creates the cube. */ private static DataCube newInstance() { // The dimensions of the cube List> dimensions = ImmutableList.>of(COUNTRY, KINGDOM, GEOREFERENCED); // The way the dimensions are "rolled up" for summary counting List rollups = ImmutableList.of(new Rollup(COUNTRY), new Rollup(KINGDOM), new Rollup(GEOREFERENCED), new Rollup(COUNTRY, KINGDOM), new Rollup(COUNTRY, GEOREFERENCED), new Rollup(KINGDOM, GEOREFERENCED), // more than 2 requires special syntax new Rollup(ImmutableSet.of(new DimensionAndBucketType(COUNTRY), new DimensionAndBucketType(KINGDOM), new DimensionAndBucketType(GEOREFERENCED)))); return new DataCube(dimensions, rollups); } }
In this code, we are making use of ID substitution for the kingdom. ID substitution is an inbuilt feature of DataCube whereby an auto-generated ID is used to substitute verbose coordinates (a value for a dimension). This is an important feature to help improve performance as coordinates are used to construct the cube lookup keys, which translate into the key used for the HBase table. The substitution is achieved by using a simple table holding a running counter and a mapping table holding the field-to-id mapping. When inserting data into the cube, the counter is incremented (with custom locking to support concurrency within the cluster), the mapping is stored, and the counter value used as the coordinate. When reading, the mapping table is used to construct the lookup key. With the cube defined, we are ready to populate it. One could simply iterate over the source data and populate the cube with the likes of the following:
DataCubeIo dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
dataCubeIo.writeSync(new LongOp(1), 
  new WriteBuilder(Cube.INSTANCE)
    .at(Cube.COUNTRY, "Spain") // for example
    .at(Cube.KINGDOM, "Animalia")
    .at(Cube.GEOREFERENCED, true)
);
However, one should consider what to do when you have the following inevitable scenarios:
  1. A new dimension or rollup is to be added to the running cube
  2. Changes to the source data have occurred without the cube being notified (e.g. through a batch load, or missing notifications due to messaging failures)
  3. Some disaster recovery requiring a cube rebuild
To handle this when using HBase as the cube storage engine, we make use of the inbuilt backfill functionality. Backfilling is a multistage process:
  1. A snapshot of the live cube is taken and stored in a snapshot table
  2. An offline cube is calculated from the source data and stored in a backfill table
  3. The snapshot and live cube are compared to determine changes that were accepted in the live cube during the rebuilding process (step 2). These changes are then applied to the backfill table
  4. The backfill is hot swapped to become the live cube
This is all handled within DataCube with the exception of stage 2, where we are required to provide a BackfillCallback, the logic responsible for populating the new cube from the source data. The following example illustrates a BackfillCallback using a simple MapReduce job to scan an HBase table for the source data.
/**
 * The callback used from the backfill process to spawn the job to write the new data in the cube.
 */
public class BackfillCallback implements HBaseBackfillCallback {

  // Property keys passed in on the job conf to the Mapper
  static final String TARGET_TABLE_KEY = "gbif:cubewriter:targetTable";
  static final String TARGET_CF_KEY = "gbif:cubewriter:targetCF";
  // Controls the scanner caching size for the source data scan (100-5000 is reasonable)
  private static final int SCAN_CACHE = 200;
  // The source data table
  private static final String SOURCE_TABLE = "dc_occurrence";

  @Override
  public void backfillInto(Configuration conf, byte[] table, byte[] cf, long snapshotFinishMs) throws IOException {
    conf = HBaseConfiguration.create();
    conf.set(TARGET_TABLE_KEY, Bytes.toString(table));
    conf.set(TARGET_CF_KEY, Bytes.toString(cf));
    Job job = new Job(conf, "CubeWriterMapper");

    job.setJarByClass(CubeWriterMapper.class);
    Scan scan = new Scan();
    scan.setCaching(SCAN_CACHE);
    scan.setCacheBlocks(false);

    // we do not want to get bad counts in the cube!
    job.getConfiguration().set("mapred.map.tasks.speculative.execution", "false");
    job.getConfiguration().set("mapred.reduce.tasks.speculative.execution", "false");
    job.setNumReduceTasks(0);
    TableMapReduceUtil.initTableMapperJob(SOURCE_TABLE, scan, CubeWriterMapper.class, null, null, job);
    job.setOutputFormatClass(NullOutputFormat.class);
    try {
      boolean b = job.waitForCompletion(true);
      if (!b) {
        throw new IOException("Unknown error with job.  Check the logs.");
      }
    } catch (Exception e) {
      throw new IOException(e);
    }
  }
}
/**
 * The Mapper used to read the source data and write into the target cube.
 * Counters are written to simplify the spotting of issues, so look to the Job counters on completion.
 */
public class CubeWriterMapper extends TableMapper {

  // TODO: These should come from a common schema utility in the future
  // The source HBase table fields
  private static final byte[] CF = Bytes.toBytes("o");
  private static final byte[] COUNTRY = Bytes.toBytes("icc");
  private static final byte[] KINGDOM = Bytes.toBytes("ik");
  private static final byte[] CELL = Bytes.toBytes("icell");

  // Names for counters used in the Hadoop Job
  private static final String STATS = "Stats";
  private static final String STAT_COUNTRY = "Country present";
  private static final String STAT_KINGDOM = "Kingdom present";
  private static final String STAT_GEOREFENCED = "Georeferenced";
  private static final String STAT_SKIPPED = "Skipped record";
  private static final String KINGDOMS = "Kingdoms";

  // The batch size to use when writing the cube
  private static final int CUBE_WRITE_BATCH_SIZE = 1000;

  static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

  private DataCubeIo dataCubeIo;

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    super.cleanup(context);
    // ensure we're all flushed since batch mode
    dataCubeIo.flush();
    dataCubeIo = null;
  }

  /**
   * Utility to read a named field from the row.
   */
  private Integer getValueAsInt(Result row, byte[] cf, byte[] col) {
    byte[] v = row.getValue(cf, col);
    if (v != null && v.length > 0) {
      return Bytes.toInt(v);
    }
    return null;
  }

  /**
   * Utility to read a named field from the row.
   */
  private String getValueAsString(Result row, byte[] cf, byte[] col) {
    byte[] v = row.getValue(cf, col);
    if (v != null && v.length > 0) {
      return Bytes.toString(v);
    }
    return null;
  }


  @Override
  protected void map(ImmutableBytesWritable key, Result row, Context context) throws IOException, InterruptedException {
    String country = getValueAsString(row, CF, COUNTRY);
    String kingdom = getValueAsString(row, CF, KINGDOM);
    Integer cell = getValueAsInt(row, CF, CELL);

    WriteBuilder b = new WriteBuilder(Cube.INSTANCE);
    if (country != null) {
      b.at(Cube.COUNTRY, country);
      context.getCounter(STATS, STAT_COUNTRY).increment(1);
    }
    if (kingdom != null) {
      b.at(Cube.KINGDOM, kingdom);
      context.getCounter(STATS, STAT_KINGDOM).increment(1);
      context.getCounter(KINGDOMS, kingdom).increment(1);
    }
    if (cell != null) {
      b.at(Cube.GEOREFERENCED, true);
      context.getCounter(STATS, STAT_GEOREFENCED).increment(1);
    }
    if (b.getBuckets() != null && !b.getBuckets().isEmpty()) {
      dataCubeIo.writeSync(new LongOp(1), b);
    } else {
      context.getCounter(STATS, STAT_SKIPPED).increment(1);
    }
  }

  // Sets up the DataCubeIO with IdService etc.
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);

    IdService idService = new HBaseIdService(conf, Backfill.LOOKUP_TABLE, Backfill.COUNTER_TABLE, Backfill.CF, EMPTY_BYTE_ARRAY);

    byte[] table = Bytes.toBytes(conf.get(BackfillCallback.TARGET_TABLE_KEY));
    byte[] cf = Bytes.toBytes(conf.get(BackfillCallback.TARGET_CF_KEY));


    DbHarness hbaseDbHarness =
      new HBaseDbHarness(pool, EMPTY_BYTE_ARRAY, table, cf, LongOp.DESERIALIZER, idService, CommitType.INCREMENT);

    dataCubeIo = new DataCubeIo(Cube.INSTANCE, hbaseDbHarness, CUBE_WRITE_BATCH_SIZE, Long.MAX_VALUE, SyncLevel.BATCH_SYNC);

  }
}
With the callback written, all that is left to populate the cube is to run the backfill. Note that this process can also be used to bootstrap the live cube for the first time:
// The live cube table 
final byte[] CUBE_TABLE = "dc_cube".getBytes();
// Snapshot of the live table used during backfill
final byte[] SNAPSHOT_TABLE = "dc_snapshot".getBytes();
// Backfill table built from the source
final byte[] BACKFILL_TABLE = "dc_backfill".getBytes();
// Utility table to provide a running count for the identifier service
final byte[] COUNTER_TABLE = "dc_counter".getBytes();
// Utility table to provide a mapping from source values to assigned identifiers
final byte[] LOOKUP_TABLE = "dc_lookup".getBytes();
// All DataCube tables use a single column family
final byte[] CF = "c".getBytes();

HBaseBackfill backfill =
  new HBaseBackfill(
    conf, 
    new BackfillCallback(), // our implementation
    CUBE_TABLE, 
    SNAPSHOT_TABLE, 
    BACKFILL_TABLE, 
    CF, 
    LongOp.LongOpDeserializer.class);
backfill.runWithCheckedExceptions();
While HBase provides the storage for the cube, a backfill could be implemented against any source data, such as from a database over JDBC or from text files stored on a Hadoop filesystem. Finally we want to be able to read our cube:
DataCubeIo dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
Optional result = 
  cubeIo.get(
    new ReadBuilder(cube)
     .at(Cube.COUNTRY, "DK")
     .at(Cube.KINGDOM, "Animalia"));
// need to check if this coordinate combination hit anything in the cube
if (result.isPresent()) {
  LOG.info("Animal records in Denmark: " + result.get().getLong());
)
All the source code for the above is available in the GBIF labs svn.

Many thanks to Dave Revell at UrbanAirship for his guidance.

Wednesday, 11 July 2012

Optimizing Writes in HBase

I've written a few times about our work to improve the scanning performance of our cluster (parts 1, 2, and 3) since our highest priority for HBase is being able to serve requests for downloads of occurrence records (which require a full table scan). But now that the scanning is working nicely we need to start writing new records into our occurrence table as well as cleaning raw data and interpreting it into something more useful for the users of our data portal. That processing is built as Hive queries that read from and write back to the same HBase table. And while it was working fine on small test datasets, it all blew up once I moved the process to the full dataset. Here's what happened and how we fixed it. Note that we're using CDH3u3, with the addition of Hive 0.9.0, which we patched to support HBase 0.90.4.

The problem

Our processing is Hive queries which run as Hadoop MapReduce jobs. When the mappers were running they would eventually fail (repeatedly, ultimately killing the job) with an error that looks like this:

java.io.IOException: org.apache.hadoop.hbase.client.ScannerTimeoutException: 63882ms passed since the last invocation, timeout is currently set to 60000

We did some digging and found that this exception happens when the scanner's next() method hasn't been called within the timeout limit. We simplified our test case by reproducing this same error when doing a simple CopyTable operation (the one that ships with HBase), and again using Hive to do select overwrite table A select * from table B. In both cases mappers are assigned a split to scan based on TableInputFormat (just like our Hive jobs), and as they scan they simply put the record out to the new table. Something is holding up the loop as it tries to put, preventing it from calling next(), and thereby triggering the timeout exception.

The logs

First stop - the logs. The regionservers are littered with lines like the following:

WARN org.apache.hadoop.hbase.regionserver.MemStoreFlusher: Region occurrence,\x17\xF1o\x9C,1340981109494.ecb85155563c6614e5448c7d700b909e. has too many store files; delaying flush up to 90000ms
INFO org.apache.hadoop.hbase.regionserver.HRegion: Blocking updates for 'IPC Server handler 7 on 60020' on region occurrence,\x17\xF1o\x9C,1340981109494.ecb85155563c6614e5448c7d700b909e.: memstore size 128.2m is >= than blocking 128.0m size

Well, blocking updates sure sounds like the kind of thing that would prevent a loop from writing more puts and dutifully calling next(). After a little more digging (and testing with a variety of hbase.client.scanner.caching values, including 1) we concluded that yes, this was the problem, but why was it happening?

Why it blocks

It blocks because the memstore has hit what I'll call the "memstore blocking limit" which is controlled by the setting hbase.hregion.memstore.flush.size * hbase.hregion.memstore.block.multiplier, which by default are 64MB and 2 respectively. Normally the memstore should flush when it reaches the flush.size, but in this case it reaches 128MB because it's not allowed to flush due to too many store files (the first log line). The definition of "too many storefiles" is in turn a setting, namely hbase.hstore.blockingStoreFiles (default 7). A new store file is created every time the memstore flushes, and their number is reduced by compacting them into fewer, bigger storefiles during minor and major compactions. By default, compactions will only start if there are at least hbase.hstore.compactionThreshold (default 3) store files, and won't compact more than hbase.hstore.compaction.max (default 7) in a single compaction. And regardless of what you set flush.size to, the memstore will always flush if all memstores in the regionserver combined are using too much heap.  The acceptable levels of heap usage are defined by hbase.regionserver.global.memstore.lowerLimit (default 0.35) and hbase.regionserver.global.memstore.upperLimit (default 0.4). There is a thread dedicated to flushing that wakes up regularly and checks these limits:

  • if the flush thread wakes up and memstores are greater than the lower limit it will start flushing (starting with current biggest memstore) until it gets below the limit.
  • if flush thread wakes up and memstores are greater than the upper limit it will block updates and start flushing until it gets under lower limit, when it unblocks updates.
Fortunately we didn't see blocking because of the upper heap limit - only the "memstore blocking limit" described earlier. But at this point we only knew the different dials we could turn.

Stopping the blocking

Our goal is to stop the blocking so that our mapper doesn't timeout, while at the same time not running out of memory on the regionserver. The most obvious problem is that we have too many storefiles, which appears to be a combination of producing too many of them and not compacting them fast enough. Note that we have a 6GB heap dedicated to HBase, but can't afford to take any more away from the co-resident Hadoop mappers and reducers.

We started by upping the memstore flush size - this will produce fewer but bigger storefiles on each flush:
  • first we tried 128MB with block multiplier still at 2. This still produced too many storefiles and caused the same blocking (maybe a little better than at 64MB)
  • then tried 256MB with multiplier of 4.  The logs and ganglia showed that the flushes were happening well before 256MB (still around 130MB) "due to global heap pressure" - a sign that total memstores were consuming too much heap. This meant we were still generating too many files and got the same blocking problem, but with the "memstore blocking limit" set to 1GB the memstore blocking happened much less often, and later in the process (still killed the mappers though)
We were now producing fewer storefiles, but they were still accumulating too quickly. From ganglia we also saw that the compaction queue and storefile counts were growing unbounded, which meant we'd hit the blocking limit again eventually. Next came trying to compact more files per compaction, hence raised compaction.max to 20, and this made little difference.

So how to reduce the number of storefiles? If we had fewer stores, we'd be creating fewer files and using up less heap for memstore, so next we increased the region size.  This meant increasing the setting hbase.hregion.max.filesize from its default of 256MB to 1.5G, and then rebuilding our table with fewer pre-split regions.  That resulted in about 75% fewer regions.

It was starting to look good - the number of "Blocking updates" log messages dropped to a handful per run, but it was still enough to affect one or two jobs to the point of them getting killed.  We tried upping the memstore.lowerLimit and upperLimit to 0.45 and 0.5 respectively, but again no joy.

Now what?

Things looked kind of grim. After endless poring over ganglia charts, we kept coming back to one unexplained blip that seemed to coincide with the start of the storefile explosion that eventually killed the jobs.
Figure 1: Average memstore flush size over time
At about the halfway point of the jobs the size of memstore flushes would spike and then gradually increase until the job died. Keep in mind that the chart shows averages, and it only took a few of those flushes to wait for storefiles long enough to fill to 1GB and then start the blocking that was our undoing. Back to the logs.

From the start of Figure 1 we can see that things appear to be going smoothly - the memstores are flushing at or just above 256MB, which means they have enough heap and are doing their jobs. From the logs we see the flushes happening fine, but there are regular lines like the following:
INFO org.apache.hadoop.hbase.regionserver.MemStoreFlusher: Under global heap pressure: Region uat_occurrence,\x06\x0E\xAC\x0F,1341574060728.ab7fed6ea92842941f97cb9384ec3d4b. has too many store files, but is 625.1m vs best flushable region's 278.2m. Choosing the bigger.
This isn't quite as bad as the "delaying flush" line, but it shows that we're on the limit of what our heap can handle. Then starting from around 12:20 we see more and more like the following:
WARN org.apache.hadoop.hbase.regionserver.MemStoreFlusher: Region uat_occurrence,"\x98=\x1C,1341567129447.a3a6557c609ad7fc38815fdcedca6c26. has too many store files; delaying flush up to 90000ms
and then to top it off:
INFO org.apache.hadoop.hbase.regionserver.wal.HLog: Too many hlogs: logs=35, maxlogs=32; forcing flush of 1 regions(s): ab7fed6ea92842941f97cb9384ec3d4b
INFO org.apache.hadoop.hbase.regionserver.MemStoreFlusher: Flush thread woke up with memory above low water.
So what we have is memstores initially being forced to flush because of minor heap pressure (adds storefiles faster than we can compact). Then we have memstores delaying flushes because of too many storefiles (memstores start getting bigger - our graph spike). Then the write ahead log (WAL) complains about too many of its logs, which forces a memstore flush (so that the WAL HLog can be safely discarded - this again adds storefiles).  And for good measure the flushing thread now wakes up, finds its out of heap, and starts attempting flushes, which just aggravates the problem (adding more storefiles to the pile). The failure doesn't happen immediately but we're past the point of no return - by about 13:00 the memstores are getting to the "memstore blocking limit" and our mappers die.

What's left?


Knowing what's going on in the memstore flush size analysis is comforting, but just reinforces what we already knew - the problem is too many storefiles. So what's left?  Raising the max number of storefiles, that's what! Every storefile in a store consumes resources (file handles, xceivers, heap for holding metadata), which is why it's limited to a maximum of a very modest 7 files by default. But for us this level of write load is rare - in normal operations we won't hit anything like this, and having the capacity to write a whole bunch of storefiles over short-ish, infrequent bursts is relatively safe, since we know our nightly major compaction will clean them up again (and thereby free up all those extra resources). Crank that maximum up to 200 and, finally, the process works!

Conclusion

Our problem was that our compactions couldn't keep up with all the storefiles we were creating. We tried turning all the different HBase dials to get the storefile/compaction process to work "like they're supposed to", but in the end the key for us was the hbase.hstore.blockingStoreFiles parameter which we set to 200, which is probably double what we actually needed but gives us buffer for our infrequent, larger write jobs. We additionally settled on larger (and therefore fewer) regions, and a somewhat bigger than default memstore. Here are the relevant pieces of our hbase-site.xml after all our testing:

  <!-- default is 256MB 268435456, this is 1.5GB -->
  <property>
    <name>hbase.hregion.max.filesize</name>
    <value>1610612736</value>
  </property>
  
  <!-- default is 2 -->
  <property>
    <name>hbase.hregion.memstore.block.multiplier</name>
    <value>4</value>
  </property>
  
  <!-- default is 64MB 67108864 -->
  <property>
    <name>hbase.hregion.memstore.flush.size</name>
    <value>134217728</value>
  </property>
  
  <!-- default is 7, should be at least 2x compactionThreshold -->
  <property>
    <name>hbase.hstore.blockingStoreFiles</name>
    <value>200</value>
  </property>

And finally, if our compactions were faster and/or more frequent, we might be able to keep up with our storefile creation. That doesn't look possible without multi-threaded compactions, but naturally those exist in newer versions of HBase (starting with 0.92 - HBASE-1476) so if you're having these problems, an upgrade might be in order. Indeed this is prompting us to consider an upgrade to CDH4.

Many thanks to Lars George, who helped us get through the "Now What?" phase by digging deep into the logs and the source to help us work out what was going on.