In a small database much of this would be trivial using aggregating functions (SUM(), COUNT() etc). As the volume grows, one often precalculates these metrics which brings it's own set of consistency challenges. As one outgrows a database, as GBIF are, we need to look for new mechanisms to manage these metrics. The features of DataCube that make this attractive to us are:
- A managable process to modify the cube structure
- A higher level API to develop against
- Ability to rebuild the cube with a single pass over the source data
ID, Kingdom, ScientificName, Country, IsoCountryCode, BasisOfRecord, CellId, Year 1, Animalia, Puma concolor, Peru, PE, Observation, 13245, 1967 2, Plantae, Abies alba, Spain, ES, Observation, 3637, 2010 3, Plantae, Abies alba, Spain, ES, Observation, 3638, 2010Suppose the following metrics are required, each of which is termed a rollup in OLAP:
- Number of records per country
- Number of records per kingdom
- Number of records georeferenced / not georeferenced
- Number of records per kingdom per country
- Number of records georeferenced / not georeferenced per country
- Number of records georeferenced / not georeferenced per kingdom
- Number of records georeferenced / not georeferenced per kingdom per country
/** * The cube definition (package access only). * Dimensions are Country, Kingdom and Georeferenced with counts available for: *In this code, we are making use of ID substitution for the kingdom. ID substitution is an inbuilt feature of DataCube whereby an auto-generated ID is used to substitute verbose coordinates (a value for a dimension). This is an important feature to help improve performance as coordinates are used to construct the cube lookup keys, which translate into the key used for the HBase table. The substitution is achieved by using a simple table holding a running counter and a mapping table holding the field-to-id mapping. When inserting data into the cube, the counter is incremented (with custom locking to support concurrency within the cluster), the mapping is stored, and the counter value used as the coordinate. When reading, the mapping table is used to construct the lookup key. With the cube defined, we are ready to populate it. One could simply iterate over the source data and populate the cube with the likes of the following:*
* TODO: write public utility exposing a simple API enabling validated read/write access to cube. */ class Cube { // no id substitution static final Dimension- Country (e.g. number of record in DK)
*- Kingdom (e.g. number of animal records)
*- Georeferenced (e.g. number of records with coordinates)
*- Country and kingdom (e.g. number of plant records in the US)
*- Country and georeferenced (e.g. number of records with coordinates in the UK
*- Country and kingdom and georeferenced (e.g. number of bacteria records with coordinates in Spain)
*COUNTRY = new Dimension ("dwc:country", new StringToBytesBucketer(), false, 2); // id substitution applies static final Dimension KINGDOM = new Dimension ("dwc:kingdom", new StringToBytesBucketer(), true, 7); // no id substitution static final Dimension GEOREFERENCED = new Dimension ("gbif:georeferenced", new BooleanBucketer(), false, 1); // Singleton instance if accessed through the instance() method static final DataCube INSTANCE = newInstance(); // Not for instantiation private Cube() { } /** * Creates the cube. */ private static DataCube newInstance() { // The dimensions of the cube List > dimensions = ImmutableList. >of(COUNTRY, KINGDOM, GEOREFERENCED); // The way the dimensions are "rolled up" for summary counting List rollups = ImmutableList.of(new Rollup(COUNTRY), new Rollup(KINGDOM), new Rollup(GEOREFERENCED), new Rollup(COUNTRY, KINGDOM), new Rollup(COUNTRY, GEOREFERENCED), new Rollup(KINGDOM, GEOREFERENCED), // more than 2 requires special syntax new Rollup(ImmutableSet. of(new DimensionAndBucketType(COUNTRY), new DimensionAndBucketType(KINGDOM), new DimensionAndBucketType(GEOREFERENCED)))); return new DataCube (dimensions, rollups); } }
DataCubeIoHowever, one should consider what to do when you have the following inevitable scenarios:dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity dataCubeIo.writeSync(new LongOp(1), new WriteBuilder(Cube.INSTANCE) .at(Cube.COUNTRY, "Spain") // for example .at(Cube.KINGDOM, "Animalia") .at(Cube.GEOREFERENCED, true) );
- A new dimension or rollup is to be added to the running cube
- Changes to the source data have occurred without the cube being notified (e.g. through a batch load, or missing notifications due to messaging failures)
- Some disaster recovery requiring a cube rebuild
- A snapshot of the live cube is taken and stored in a snapshot table
- An offline cube is calculated from the source data and stored in a backfill table
- The snapshot and live cube are compared to determine changes that were accepted in the live cube during the rebuilding process (step 2). These changes are then applied to the backfill table
- The backfill is hot swapped to become the live cube
/** * The callback used from the backfill process to spawn the job to write the new data in the cube. */ public class BackfillCallback implements HBaseBackfillCallback { // Property keys passed in on the job conf to the Mapper static final String TARGET_TABLE_KEY = "gbif:cubewriter:targetTable"; static final String TARGET_CF_KEY = "gbif:cubewriter:targetCF"; // Controls the scanner caching size for the source data scan (100-5000 is reasonable) private static final int SCAN_CACHE = 200; // The source data table private static final String SOURCE_TABLE = "dc_occurrence"; @Override public void backfillInto(Configuration conf, byte[] table, byte[] cf, long snapshotFinishMs) throws IOException { conf = HBaseConfiguration.create(); conf.set(TARGET_TABLE_KEY, Bytes.toString(table)); conf.set(TARGET_CF_KEY, Bytes.toString(cf)); Job job = new Job(conf, "CubeWriterMapper"); job.setJarByClass(CubeWriterMapper.class); Scan scan = new Scan(); scan.setCaching(SCAN_CACHE); scan.setCacheBlocks(false); // we do not want to get bad counts in the cube! job.getConfiguration().set("mapred.map.tasks.speculative.execution", "false"); job.getConfiguration().set("mapred.reduce.tasks.speculative.execution", "false"); job.setNumReduceTasks(0); TableMapReduceUtil.initTableMapperJob(SOURCE_TABLE, scan, CubeWriterMapper.class, null, null, job); job.setOutputFormatClass(NullOutputFormat.class); try { boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("Unknown error with job. Check the logs."); } } catch (Exception e) { throw new IOException(e); } } }
/** * The Mapper used to read the source data and write into the target cube. * Counters are written to simplify the spotting of issues, so look to the Job counters on completion. */ public class CubeWriterMapper extends TableMapperWith the callback written, all that is left to populate the cube is to run the backfill. Note that this process can also be used to bootstrap the live cube for the first time:{ // TODO: These should come from a common schema utility in the future // The source HBase table fields private static final byte[] CF = Bytes.toBytes("o"); private static final byte[] COUNTRY = Bytes.toBytes("icc"); private static final byte[] KINGDOM = Bytes.toBytes("ik"); private static final byte[] CELL = Bytes.toBytes("icell"); // Names for counters used in the Hadoop Job private static final String STATS = "Stats"; private static final String STAT_COUNTRY = "Country present"; private static final String STAT_KINGDOM = "Kingdom present"; private static final String STAT_GEOREFENCED = "Georeferenced"; private static final String STAT_SKIPPED = "Skipped record"; private static final String KINGDOMS = "Kingdoms"; // The batch size to use when writing the cube private static final int CUBE_WRITE_BATCH_SIZE = 1000; static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; private DataCubeIo dataCubeIo; @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); // ensure we're all flushed since batch mode dataCubeIo.flush(); dataCubeIo = null; } /** * Utility to read a named field from the row. */ private Integer getValueAsInt(Result row, byte[] cf, byte[] col) { byte[] v = row.getValue(cf, col); if (v != null && v.length > 0) { return Bytes.toInt(v); } return null; } /** * Utility to read a named field from the row. */ private String getValueAsString(Result row, byte[] cf, byte[] col) { byte[] v = row.getValue(cf, col); if (v != null && v.length > 0) { return Bytes.toString(v); } return null; } @Override protected void map(ImmutableBytesWritable key, Result row, Context context) throws IOException, InterruptedException { String country = getValueAsString(row, CF, COUNTRY); String kingdom = getValueAsString(row, CF, KINGDOM); Integer cell = getValueAsInt(row, CF, CELL); WriteBuilder b = new WriteBuilder(Cube.INSTANCE); if (country != null) { b.at(Cube.COUNTRY, country); context.getCounter(STATS, STAT_COUNTRY).increment(1); } if (kingdom != null) { b.at(Cube.KINGDOM, kingdom); context.getCounter(STATS, STAT_KINGDOM).increment(1); context.getCounter(KINGDOMS, kingdom).increment(1); } if (cell != null) { b.at(Cube.GEOREFERENCED, true); context.getCounter(STATS, STAT_GEOREFENCED).increment(1); } if (b.getBuckets() != null && !b.getBuckets().isEmpty()) { dataCubeIo.writeSync(new LongOp(1), b); } else { context.getCounter(STATS, STAT_SKIPPED).increment(1); } } // Sets up the DataCubeIO with IdService etc. @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE); IdService idService = new HBaseIdService(conf, Backfill.LOOKUP_TABLE, Backfill.COUNTER_TABLE, Backfill.CF, EMPTY_BYTE_ARRAY); byte[] table = Bytes.toBytes(conf.get(BackfillCallback.TARGET_TABLE_KEY)); byte[] cf = Bytes.toBytes(conf.get(BackfillCallback.TARGET_CF_KEY)); DbHarness hbaseDbHarness = new HBaseDbHarness (pool, EMPTY_BYTE_ARRAY, table, cf, LongOp.DESERIALIZER, idService, CommitType.INCREMENT); dataCubeIo = new DataCubeIo (Cube.INSTANCE, hbaseDbHarness, CUBE_WRITE_BATCH_SIZE, Long.MAX_VALUE, SyncLevel.BATCH_SYNC); } }
// The live cube table final byte[] CUBE_TABLE = "dc_cube".getBytes(); // Snapshot of the live table used during backfill final byte[] SNAPSHOT_TABLE = "dc_snapshot".getBytes(); // Backfill table built from the source final byte[] BACKFILL_TABLE = "dc_backfill".getBytes(); // Utility table to provide a running count for the identifier service final byte[] COUNTER_TABLE = "dc_counter".getBytes(); // Utility table to provide a mapping from source values to assigned identifiers final byte[] LOOKUP_TABLE = "dc_lookup".getBytes(); // All DataCube tables use a single column family final byte[] CF = "c".getBytes(); HBaseBackfill backfill = new HBaseBackfill( conf, new BackfillCallback(), // our implementation CUBE_TABLE, SNAPSHOT_TABLE, BACKFILL_TABLE, CF, LongOp.LongOpDeserializer.class); backfill.runWithCheckedExceptions();While HBase provides the storage for the cube, a backfill could be implemented against any source data, such as from a database over JDBC or from text files stored on a Hadoop filesystem. Finally we want to be able to read our cube:
DataCubeIoAll the source code for the above is available in the GBIF labs svn.dataCubeIo = setup(Cube.INSTANCE); // omitted for brevity Optional result = cubeIo.get( new ReadBuilder(cube) .at(Cube.COUNTRY, "DK") .at(Cube.KINGDOM, "Animalia")); // need to check if this coordinate combination hit anything in the cube if (result.isPresent()) { LOG.info("Animal records in Denmark: " + result.get().getLong()); )
Many thanks to Dave Revell at UrbanAirship for his guidance.