Friday 13 July 2012

Getting started with DataCube on HBase

This tutorial blog provides a quick introduction to using DataCube, a Java based OLAP cube library with a pluggable storage engine open sourced by Urban Airship. In this tutorial, we make use of the inbuilt HBase storage engine.

In a small database much of this would be trivial using aggregating functions (SUM(), COUNT() etc). As the volume grows, one often precalculates these metrics which brings it's own set of consistency challenges. As one outgrows a database, as GBIF are, we need to look for new mechanisms to manage these metrics. The features of DataCube that make this attractive to us are:
  • A managable process to modify the cube structure
  • A higher level API to develop against
  • Ability to rebuild the cube with a single pass over the source data
For this tutorial we will consider the source data as classical DarwinCore occurrence records, where each record represents the metadata associated with a species observation event, e.g.:
ID, Kingdom, ScientificName, Country, IsoCountryCode, BasisOfRecord, CellId, Year
1, Animalia, Puma concolor, Peru, PE, Observation, 13245, 1967
2, Plantae, Abies alba, Spain, ES, Observation, 3637, 2010
3, Plantae, Abies alba, Spain, ES, Observation, 3638, 2010
Suppose the following metrics are required, each of which is termed a rollup in OLAP:
  1. Number of records per country
  2. Number of records per kingdom
  3. Number of records georeferenced / not georeferenced
  4. Number of records per kingdom per country
  5. Number of records georeferenced / not georeferenced per country
  6. Number of records georeferenced / not georeferenced per kingdom
  7. Number of records georeferenced / not georeferenced per kingdom per country
Given the requirements above, this can be translated into a cube definition with 3 dimensions, and 7 rollups as follows:
/**
 * The cube definition (package access only).
 * Dimensions are Country, Kingdom and Georeferenced with counts available for:
 * 
    *
  1. Country (e.g. number of record in DK)
  2. *
  3. Kingdom (e.g. number of animal records)
  4. *
  5. Georeferenced (e.g. number of records with coordinates)
  6. *
  7. Country and kingdom (e.g. number of plant records in the US)
  8. *
  9. Country and georeferenced (e.g. number of records with coordinates in the UK
  10. *
  11. Country and kingdom and georeferenced (e.g. number of bacteria records with coordinates in Spain)
  12. *
* TODO: write public utility exposing a simple API enabling validated read/write access to cube. */ class Cube { // no id substitution static final Dimension COUNTRY = new Dimension("dwc:country", new StringToBytesBucketer(), false, 2); // id substitution applies static final Dimension KINGDOM = new Dimension("dwc:kingdom", new StringToBytesBucketer(), true, 7); // no id substitution static final Dimension GEOREFERENCED = new Dimension("gbif:georeferenced", new BooleanBucketer(), false, 1); // Singleton instance if accessed through the instance() method static final DataCube INSTANCE = newInstance(); // Not for instantiation private Cube() { } /** * Creates the cube. */ private static DataCube newInstance() { // The dimensions of the cube List> dimensions = ImmutableList.>of(COUNTRY, KINGDOM, GEOREFERENCED); // The way the dimensions are "rolled up" for summary counting List rollups = ImmutableList.of(new Rollup(COUNTRY), new Rollup(KINGDOM), new Rollup(GEOREFERENCED), new Rollup(COUNTRY, KINGDOM), new Rollup(COUNTRY, GEOREFERENCED), new Rollup(KINGDOM, GEOREFERENCED), // more than 2 requires special syntax new Rollup(ImmutableSet.of(new DimensionAndBucketType(COUNTRY), new DimensionAndBucketType(KINGDOM), new DimensionAndBucketType(GEOREFERENCED)))); return new DataCube(dimensions, rollups); } }
In this code, we are making use of ID substitution for the kingdom. ID substitution is an inbuilt feature of DataCube whereby an auto-generated ID is used to substitute verbose coordinates (a value for a dimension). This is an important feature to help improve performance as coordinates are used to construct the cube lookup keys, which translate into the key used for the HBase table. The substitution is achieved by using a simple table holding a running counter and a mapping table holding the field-to-id mapping. When inserting data into the cube, the counter is incremented (with custom locking to support concurrency within the cluster), the mapping is stored, and the counter value used as the coordinate. When reading, the mapping table is used to construct the lookup key. With the cube defined, we are ready to populate it. One could simply iterate over the source data and populate the cube with the likes of the following:
DataCubeIo dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
dataCubeIo.writeSync(new LongOp(1), 
  new WriteBuilder(Cube.INSTANCE)
    .at(Cube.COUNTRY, "Spain") // for example
    .at(Cube.KINGDOM, "Animalia")
    .at(Cube.GEOREFERENCED, true)
);
However, one should consider what to do when you have the following inevitable scenarios:
  1. A new dimension or rollup is to be added to the running cube
  2. Changes to the source data have occurred without the cube being notified (e.g. through a batch load, or missing notifications due to messaging failures)
  3. Some disaster recovery requiring a cube rebuild
To handle this when using HBase as the cube storage engine, we make use of the inbuilt backfill functionality. Backfilling is a multistage process:
  1. A snapshot of the live cube is taken and stored in a snapshot table
  2. An offline cube is calculated from the source data and stored in a backfill table
  3. The snapshot and live cube are compared to determine changes that were accepted in the live cube during the rebuilding process (step 2). These changes are then applied to the backfill table
  4. The backfill is hot swapped to become the live cube
This is all handled within DataCube with the exception of stage 2, where we are required to provide a BackfillCallback, the logic responsible for populating the new cube from the source data. The following example illustrates a BackfillCallback using a simple MapReduce job to scan an HBase table for the source data.
/**
 * The callback used from the backfill process to spawn the job to write the new data in the cube.
 */
public class BackfillCallback implements HBaseBackfillCallback {

  // Property keys passed in on the job conf to the Mapper
  static final String TARGET_TABLE_KEY = "gbif:cubewriter:targetTable";
  static final String TARGET_CF_KEY = "gbif:cubewriter:targetCF";
  // Controls the scanner caching size for the source data scan (100-5000 is reasonable)
  private static final int SCAN_CACHE = 200;
  // The source data table
  private static final String SOURCE_TABLE = "dc_occurrence";

  @Override
  public void backfillInto(Configuration conf, byte[] table, byte[] cf, long snapshotFinishMs) throws IOException {
    conf = HBaseConfiguration.create();
    conf.set(TARGET_TABLE_KEY, Bytes.toString(table));
    conf.set(TARGET_CF_KEY, Bytes.toString(cf));
    Job job = new Job(conf, "CubeWriterMapper");

    job.setJarByClass(CubeWriterMapper.class);
    Scan scan = new Scan();
    scan.setCaching(SCAN_CACHE);
    scan.setCacheBlocks(false);

    // we do not want to get bad counts in the cube!
    job.getConfiguration().set("mapred.map.tasks.speculative.execution", "false");
    job.getConfiguration().set("mapred.reduce.tasks.speculative.execution", "false");
    job.setNumReduceTasks(0);
    TableMapReduceUtil.initTableMapperJob(SOURCE_TABLE, scan, CubeWriterMapper.class, null, null, job);
    job.setOutputFormatClass(NullOutputFormat.class);
    try {
      boolean b = job.waitForCompletion(true);
      if (!b) {
        throw new IOException("Unknown error with job.  Check the logs.");
      }
    } catch (Exception e) {
      throw new IOException(e);
    }
  }
}
/**
 * The Mapper used to read the source data and write into the target cube.
 * Counters are written to simplify the spotting of issues, so look to the Job counters on completion.
 */
public class CubeWriterMapper extends TableMapper {

  // TODO: These should come from a common schema utility in the future
  // The source HBase table fields
  private static final byte[] CF = Bytes.toBytes("o");
  private static final byte[] COUNTRY = Bytes.toBytes("icc");
  private static final byte[] KINGDOM = Bytes.toBytes("ik");
  private static final byte[] CELL = Bytes.toBytes("icell");

  // Names for counters used in the Hadoop Job
  private static final String STATS = "Stats";
  private static final String STAT_COUNTRY = "Country present";
  private static final String STAT_KINGDOM = "Kingdom present";
  private static final String STAT_GEOREFENCED = "Georeferenced";
  private static final String STAT_SKIPPED = "Skipped record";
  private static final String KINGDOMS = "Kingdoms";

  // The batch size to use when writing the cube
  private static final int CUBE_WRITE_BATCH_SIZE = 1000;

  static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

  private DataCubeIo dataCubeIo;

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    super.cleanup(context);
    // ensure we're all flushed since batch mode
    dataCubeIo.flush();
    dataCubeIo = null;
  }

  /**
   * Utility to read a named field from the row.
   */
  private Integer getValueAsInt(Result row, byte[] cf, byte[] col) {
    byte[] v = row.getValue(cf, col);
    if (v != null && v.length > 0) {
      return Bytes.toInt(v);
    }
    return null;
  }

  /**
   * Utility to read a named field from the row.
   */
  private String getValueAsString(Result row, byte[] cf, byte[] col) {
    byte[] v = row.getValue(cf, col);
    if (v != null && v.length > 0) {
      return Bytes.toString(v);
    }
    return null;
  }


  @Override
  protected void map(ImmutableBytesWritable key, Result row, Context context) throws IOException, InterruptedException {
    String country = getValueAsString(row, CF, COUNTRY);
    String kingdom = getValueAsString(row, CF, KINGDOM);
    Integer cell = getValueAsInt(row, CF, CELL);

    WriteBuilder b = new WriteBuilder(Cube.INSTANCE);
    if (country != null) {
      b.at(Cube.COUNTRY, country);
      context.getCounter(STATS, STAT_COUNTRY).increment(1);
    }
    if (kingdom != null) {
      b.at(Cube.KINGDOM, kingdom);
      context.getCounter(STATS, STAT_KINGDOM).increment(1);
      context.getCounter(KINGDOMS, kingdom).increment(1);
    }
    if (cell != null) {
      b.at(Cube.GEOREFERENCED, true);
      context.getCounter(STATS, STAT_GEOREFENCED).increment(1);
    }
    if (b.getBuckets() != null && !b.getBuckets().isEmpty()) {
      dataCubeIo.writeSync(new LongOp(1), b);
    } else {
      context.getCounter(STATS, STAT_SKIPPED).increment(1);
    }
  }

  // Sets up the DataCubeIO with IdService etc.
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);

    IdService idService = new HBaseIdService(conf, Backfill.LOOKUP_TABLE, Backfill.COUNTER_TABLE, Backfill.CF, EMPTY_BYTE_ARRAY);

    byte[] table = Bytes.toBytes(conf.get(BackfillCallback.TARGET_TABLE_KEY));
    byte[] cf = Bytes.toBytes(conf.get(BackfillCallback.TARGET_CF_KEY));


    DbHarness hbaseDbHarness =
      new HBaseDbHarness(pool, EMPTY_BYTE_ARRAY, table, cf, LongOp.DESERIALIZER, idService, CommitType.INCREMENT);

    dataCubeIo = new DataCubeIo(Cube.INSTANCE, hbaseDbHarness, CUBE_WRITE_BATCH_SIZE, Long.MAX_VALUE, SyncLevel.BATCH_SYNC);

  }
}
With the callback written, all that is left to populate the cube is to run the backfill. Note that this process can also be used to bootstrap the live cube for the first time:
// The live cube table 
final byte[] CUBE_TABLE = "dc_cube".getBytes();
// Snapshot of the live table used during backfill
final byte[] SNAPSHOT_TABLE = "dc_snapshot".getBytes();
// Backfill table built from the source
final byte[] BACKFILL_TABLE = "dc_backfill".getBytes();
// Utility table to provide a running count for the identifier service
final byte[] COUNTER_TABLE = "dc_counter".getBytes();
// Utility table to provide a mapping from source values to assigned identifiers
final byte[] LOOKUP_TABLE = "dc_lookup".getBytes();
// All DataCube tables use a single column family
final byte[] CF = "c".getBytes();

HBaseBackfill backfill =
  new HBaseBackfill(
    conf, 
    new BackfillCallback(), // our implementation
    CUBE_TABLE, 
    SNAPSHOT_TABLE, 
    BACKFILL_TABLE, 
    CF, 
    LongOp.LongOpDeserializer.class);
backfill.runWithCheckedExceptions();
While HBase provides the storage for the cube, a backfill could be implemented against any source data, such as from a database over JDBC or from text files stored on a Hadoop filesystem. Finally we want to be able to read our cube:
DataCubeIo dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
Optional result = 
  cubeIo.get(
    new ReadBuilder(cube)
     .at(Cube.COUNTRY, "DK")
     .at(Cube.KINGDOM, "Animalia"));
// need to check if this coordinate combination hit anything in the cube
if (result.isPresent()) {
  LOG.info("Animal records in Denmark: " + result.get().getLong());
)
All the source code for the above is available in the GBIF labs svn.

Many thanks to Dave Revell at UrbanAirship for his guidance.

4 comments:

  1. First of all, thank you for posting this blog as the only one explaining how to build data cube over hbase.
    However, I have many questions as how to build my own data cube over my pseudo distributed hbase.
    To begin with, what is the configuration of my hbase cluster? Can I use the code from GBIF labs svn directly in my pseudo distributed hbase?
    Second, I have no idea how to get the DarwinCore data. Could you please post a link to that as a test case?
    Third, the GBIF labs document is not well written, I have a really hard time to build a data cube.
    Last but not least, I would really appreciate your efforts to make this accessible to more people like me who is trying to build data cubes over hbase.

    ReplyDelete
  2. Hey, I'm a community blog curator for DZone. I wanted to talk with you about potentially featuring your blog on DZone's content portals. Send me an email at allenc [at] dzone [dot] com and I'll explain the details.

    ReplyDelete
  3. Really thanks for this great article, it helped me alot :)
    I read something about Design Docs in comments .. are those docs available or not
    i hope that you still follow this blog comments :)

    Thanks,
    Mohamed Abd Elrazek

    ReplyDelete
  4. Really thanks for this great article, it helped me alot :)
    I read something about Design Docs in comments .. are those docs available or not
    i hope that you still follow this blog comments :)

    Thanks,
    Mohamed Abd Elrazek

    ReplyDelete