Wednesday 3 August 2011

Indexing occurrences data - using Lucene technology

The GBIF Occurrence Index collects, stores and parses data gathered from different sources to provide a fast and accurate access to biodiversity occurrence data. The purpose of having a GBIF Index is optimize speed, relevance and performance of search functionalities that will be implemented by the new GBIF portal architecture.

Currently, GBIF has been providing search functionalities in its Data Portal supported in a semi-denormalized index relational database design, which allows find occurrence information by specifying filters to refine the expected results. That design was envisioned to support use cases of the actual GBIF Data Portal (a Web application); for the next generation of the GBIF platform, a new set of requirements must be achieved and is possible that the current index will not be able to support them, the most relevant of those requirements are: scheduling of batch exports, full text search, realtime faceted search and probably new schemas of data sharing with other biodiversity networks.

For implementing this new Occurrence Index, several technologies are under evaluated, each technology taken into considerationfor specific features that make them an attractive option, those are:
PostgreSQLThis relational data base contains several features that worth evaluate: query optimization for JOIN-like queries, flexible key-value store, partial indices and multicolumn indices
Lucene IndexAt least four options are available for this implementation: pure Lucene Index, Katta, Apache Solr and ElasticSearch
MySQL This is the current implementation of the index, a evaluation could help to determine if this technology will be able to support new use cases
Key-value systemsSeveral schema-less data stores are available: CouchDB, Mongo, PostgreSQL hstore. The main concern about these technologies is their capabilities to handle a considerable amount of records
In this post will show some preliminary results in the evaluation of Lucene-based indices, specifically: Lucene as itself, Katta, Apache Solr and ElasticSearch. The analysis will keep apart two concerns (in this post only the index creation is cited):
  • Index creation, means how the index is created, split (in shards) and merged if necessary. 
  • Index use, refers in how the index performs in terms of usability (queries and search patterns), performance (response time) and througput.

Index Creation

Three scenarios were considered to the index creation phase:
  • Single process - n Indices: In this case a single process creates n-shards, the input data are split evenly; an IndexWriter is created for each shard. The case of n = 1 is considered part of this scenario, the # of shards is a parameter defined by the user, and is equal to the # of expected shards at the end of the process.
  • N threads - n Indices: The IndexWriter is a thread safe class, so it can be shared by several threads in order to create a single Index. The # of shards is defined by the user and internally is used to define the # of IndexWriters.
  • Distributed Index creation: in this case the index is created by splitting the input data into N shards, each shard is assigned to one process that contains a single IndexWriter which is responsible for the index creation.

Single process index creation

The process followed for this scenario is pretty straightforward:
  • The input is a row delimited file and each column is separated by a special character ('/001' in our case.
  • The # of shards input defines the number IndexWriters (only one IndexWriter can be opened for a Lucene Index).
  • Each row represents a Lucene document and is stored using one of the available index writers.
  • If multiple indices were created, at the final step those are merged into a single index (using "IndexWriter.addIndexesNoOptimize")
This process was tested using a 100 millions of records file. The entire process took 9200821 milliseconds (= 9200.821 seconds = 153.347016666666667 minutes = 2.555783611111111 hours) to finish. Some optimizations were implemented for this process, worth mention that the same sort of optimizations were applied for the multithread scenario:
  • Re-use the IndexWriter in multiple threads
  • Re-use the org.apache.lucene.document.Document and org.apache.lucene.document.Field instances. The Lucene fields are created in a static block and its value is changed for each new Document, then the document is added to the index. The intention of this is avoid the objects to be garbage collected.
      static {
        for (int i = 0; i < accFieldsValues.length; i++) {
          fields[i] = new Field(accFieldsValues[i].name(), "", Store.YES, Index.ANALYZED);
     //Sets the field value
     //Adds the same document instance with different values
  • The compound file format is turned off, this reduces the amount of files opened at the same time.
  • IndexWriter.autocommit is set to false: since the index doesn't provide searching during the creation time, this feature can be disable.
  • The flush is done by RAM, and the RAM usage is maximized:
    LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy(); logByteSizeMergePolicy.setMergeFactor(mergeFactor);
  • Every N documents a entry is written in a log in order to notify the overall progress.

Multithreaded Index Creation

In terms of optimizations applied this scenario is very similar to the "Single process" scenario. However, the process is very different in terms of steps and the resulted index:
  • The input is a row delimited file and each column is separated by a special character ('/001' in our case).
  • The number of rows of the input file is known and is passed as input parameter.
  • The input file is split evenly in intermediate files, each file is assigned to a thread which will read it to create a Lucene Index.
  • The intermediate files are deleted after each index is created.
  • Depending of the number of shards desired , the indices are merged in smaller set of indices.
This process was run using: a pool of 50 threads and an input file with 100 million of rows. The execution time is detailed in the next table:
Phase Time
Slicing (split the input file and distribute it in the threads)1045948ms == 17.43 minutes
Indices creation6890988 == 114.8498 minutes
Total time 132.2798 minutes = 2.204663333333333 hours

Katta for Index Creation

Katta is a distributed storage of indices, currently supports 2 types of indices: Lucene and Hadoop MapFiles. It uses ZooKeeper to coordinate the index creation, replication and the search across the nodes.
Main relevant features
  • An Katta index is basically a folder containing sub-folders (shards)
  • The client-node communication is implemented using HadoopRPC
  • Supports distributed scoring, each search query requires two network roundtrips: get document frequencies in all shards and second perform the query.
  • Provides functionality to merge indices (though is not a very comple task to implement using standard Lucene libraries)
Relevant issues found
  • Documentation is not extensive and lacks of necessary detail.
  • Small community and the development is very low: last commit was done in 2009-04-2.
  • Doesn't provide any help to create the indeces, index sharding must be done prior to import them into Katta.
Test environment configuration
  • A cluster of 4 Katta nodes in 2 servers was used.
  • The index was split in 8 shards.
  • The master configuration is replicated in each node using passphraseless ssh access between master and nodes.
  • The ZooKeeper server was embedded into the Katta master node. (file) ==>zookeeper.embedded=true 
  • Each node contains 2 shards, each shard is replicated in 2 nodes.
  • The Lucene sharded index contains 100 millions of documents and was stored at the Hadoop distributed files system.
Index creation
Since Katta doesn't provide any functionality to create a Lucene from the scratch, the index was built using a multithread application and the were copied into Hadoop DFS. Then, the sharded index was imported into Katta using the command line:
bin/katta addIndex occurrence hdfs://namenode:port/occurrence/shardedindex/ 2
("2" means a replication factor of 2). Importing a index into Katta is just a matter of copy the file from Hadoop and update the Index status in the ZooKeeper server, so the index creation is external factor to Katta. In a next post the "Distributed Index Creation" scenario will be analyzed as well as the technologies ElasticSearch and Solr for index creation...

No comments:

Post a Comment