Currently, GBIF has been providing search functionalities in its Data Portal supported in a semi-denormalized index relational database design, which allows find occurrence information by specifying filters to refine the expected results. That design was envisioned to support use cases of the actual GBIF Data Portal (a Web application); for the next generation of the GBIF platform, a new set of requirements must be achieved and is possible that the current index will not be able to support them, the most relevant of those requirements are: scheduling of batch exports, full text search, realtime faceted search and probably new schemas of data sharing with other biodiversity networks.
For implementing this new Occurrence Index, several technologies are under evaluated, each technology taken into considerationfor specific features that make them an attractive option, those are:
Technology | Description |
PostgreSQL | This relational data base contains several features that worth evaluate: query optimization for JOIN-like queries, flexible key-value store, partial indices and multicolumn indices |
Lucene Index | At least four options are available for this implementation: pure Lucene Index, Katta, Apache Solr and ElasticSearch |
MySQL | This is the current implementation of the index, a evaluation could help to determine if this technology will be able to support new use cases |
Key-value systems | Several schema-less data stores are available: CouchDB, Mongo, PostgreSQL hstore. The main concern about these technologies is their capabilities to handle a considerable amount of records |
- Index creation, means how the index is created, split (in shards) and merged if necessary.
- Index use, refers in how the index performs in terms of usability (queries and search patterns), performance (response time) and througput.
Index Creation
Three scenarios were considered to the index creation phase:- Single process - n Indices: In this case a single process creates n-shards, the input data are split evenly; an IndexWriter is created for each shard. The case of n = 1 is considered part of this scenario, the # of shards is a parameter defined by the user, and is equal to the # of expected shards at the end of the process.
- N threads - n Indices: The IndexWriter is a thread safe class, so it can be shared by several threads in order to create a single Index. The # of shards is defined by the user and internally is used to define the # of IndexWriters.
- Distributed Index creation: in this case the index is created by splitting the input data into N shards, each shard is assigned to one process that contains a single IndexWriter which is responsible for the index creation.
Single process index creation
The process followed for this scenario is pretty straightforward:- The input is a row delimited file and each column is separated by a special character ('/001' in our case.
- The # of shards input defines the number IndexWriters (only one IndexWriter can be opened for a Lucene Index).
- Each row represents a Lucene document and is stored using one of the available index writers.
- If multiple indices were created, at the final step those are merged into a single index (using "IndexWriter.addIndexesNoOptimize")
- Re-use the IndexWriter in multiple threads
- Re-use the org.apache.lucene.document.Document and org.apache.lucene.document.Field instances. The Lucene fields are created in a static block and its value is changed for each new Document, then the document is added to the index. The intention of this is avoid the objects to be garbage collected.
//Initialization static { for (int i = 0; i < accFieldsValues.length; i++) { fields[i] = new Field(accFieldsValues[i].name(), "", Store.YES, Index.ANALYZED); } } ... //Sets the field value fields[fieldsCount].setValue(stringTokenizer.nextToken()); ... //Adds the same document instance with different values indexWriter.addDocument(doc);
- The compound file format is turned off, this reduces the amount of files opened at the same time.
- IndexWriter.autocommit is set to false: since the index doesn't provide searching during the creation time, this feature can be disable.
-
The flush is done by RAM, and the RAM usage is maximized:
LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy(); logByteSizeMergePolicy.setMergeFactor(mergeFactor); ... indexWriterConfig.setRAMBufferSizeMB(bufferSize);
- Every N documents a entry is written in a log in order to notify the overall progress.
Multithreaded Index Creation
In terms of optimizations applied this scenario is very similar to the "Single process" scenario. However, the process is very different in terms of steps and the resulted index:- The input is a row delimited file and each column is separated by a special character ('/001' in our case).
- The number of rows of the input file is known and is passed as input parameter.
- The input file is split evenly in intermediate files, each file is assigned to a thread which will read it to create a Lucene Index.
- The intermediate files are deleted after each index is created.
- Depending of the number of shards desired , the indices are merged in smaller set of indices.
Phase | Time |
Slicing (split the input file and distribute it in the threads) | 1045948ms == 17.43 minutes |
Indices creation | 6890988 == 114.8498 minutes |
Total time | 132.2798 minutes = 2.204663333333333 hours |
Katta for Index Creation
Katta is a distributed storage of indices, currently supports 2 types of indices: Lucene and Hadoop MapFiles. It uses ZooKeeper to coordinate the index creation, replication and the search across the nodes.Main relevant features
- An Katta index is basically a folder containing sub-folders (shards)
- The client-node communication is implemented using HadoopRPC
- Supports distributed scoring, each search query requires two network roundtrips: get document frequencies in all shards and second perform the query.
- Provides functionality to merge indices (though is not a very comple task to implement using standard Lucene libraries)
Relevant issues found
- Documentation is not extensive and lacks of necessary detail.
- Small community and the development is very low: last commit was done in 2009-04-2.
- Doesn't provide any help to create the indeces, index sharding must be done prior to import them into Katta.
Test environment configuration
- A cluster of 4 Katta nodes in 2 servers was used.
- The index was split in 8 shards.
- The master configuration is replicated in each node using passphraseless ssh access between master and nodes.
- The ZooKeeper server was embedded into the Katta master node.
katta.zk.properties (file) ==>zookeeper.embedded=true
- Each node contains 2 shards, each shard is replicated in 2 nodes.
- The Lucene sharded index contains 100 millions of documents and was stored at the Hadoop distributed files system.
Index creation
Since Katta doesn't provide any functionality to create a Lucene from the scratch, the index was built using a multithread application and the were copied into Hadoop DFS. Then, the sharded index was imported into Katta using the command line:bin/katta addIndex occurrence hdfs://namenode:port/occurrence/shardedindex/ 2("2" means a replication factor of 2). Importing a index into Katta is just a matter of copy the file from Hadoop and update the Index status in the ZooKeeper server, so the index creation is external factor to Katta. In a next post the "Distributed Index Creation" scenario will be analyzed as well as the technologies ElasticSearch and Solr for index creation...
No comments:
Post a Comment