In the last post Lars talked about setting up Ganglia for monitoring our Hadoop and HBase installations. That was in preparation for giving HBase a solid testing run to assess its suitability for hosting our index of occurrence records. One of the important features in our new Data Portal will be the "Download" function that lets people download occurrences matching some search criteria and currently that process is a very manual and labour intensive one, so automating it will be a big help to us. Using HBase it would be implemented as a full table scan, and that's why I've spent some time testing our scan performance. Anyone who has been down this road will probably have encountered the myriad opinions on what will improve performance (some of them conflicting) along with the seemingly endless parameters that can be tuned in a given cluster. The overall result of that kind of research is: "You gotta do it yourself". So here we go.
Because we hope to get some feedback from the HBase community on our results (ideally comparisons to other running clusters) we decided to use the PerformanceEvaluation class that ships with HBase. Unfortunately it's not completely bug free so I've had to patch it to work in the way we would like. From a stock cdh3u3 HBase I patched HBASE-5401 and HBASE-4688, and had a good, hard think about HBASE-5402. There are a bunch of things I learned about the PerformanceEvaluation (PE) class in the last few weeks, and the implications of 5402 are some of them. If anyone's interested I'll blog about those another time, but for now suffice it to say that in my final suite of tests I went with writing tables using "sequentialWrite" (which obviates the difficulties in 5402) and doing scans with the "scan" test.
There are many variables that can be changed in an HBase/Hadoop cluster setup but you have to start somewhere so here's our basic config that remained unchanged throughout the test (you can see more at its ganglia page):
Master (c1n2): HDFS NameNode, Hadoop JobTracker, HBase Master, and Zookeeper
Slaves (c2n1, c2n2, c2n3): HDFS DataNode, Hadoop TaskTracker, HBase RegionServer (6 GB heap)
Hardware:
c1n2: 1x Intel(R) Xeon(R) X3363 @ 2.83GHz (quad), 8GB RAM, 2x500G SATA 5.4K
c2n*: 2x Intel(R) Xeon(R) E5630 @ 2.53GHz (quad), 24GB RAM, 6x250G SATA 7.2K
The parameters I varied are:
- number of rows in the test table (50M, 100M, 200M)
- number of mappers per server (8, 10, 12, 14, 20 - more than 20 and we ran out of RAM)
- snappy compression (on/off)
- scanner caching (size)
- block caching (on/off)
Test Methodology
The test methodology was to start with an empty HDFS (replication factor of 2), an empty HBase and no other tasks running on the machines. Then build a table using a command line like
hbase org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 100
and then wait for the split/compaction cycle to complete, and then run a major compaction to encourage data locality. In tests using compression I would construct an empty table by hand, specifying compression=>'SNAPPY' for the info column family before running the sequentialWrite. I'd also wait for the region balancer to run, ensuring that all regionservers had an equal number of regions (+/- 1). Once that was complete I would run a scan test with a command like:
hbase org.apache.hadoop.hbase.PerformanceEvaluation scan 100
and note the real time taken as reported by the JobTracker as well as the total time spent in mappers as reported by PE itself. I ran each scan test 3 times unless the first 2 tests were very close, in which case I sometimes skipped the 3rd run. Note also that PE uses 10 byte keys and 1000 kilobyte values by default.
Two things were immediately obvious in my caching tests, which I'll talk about briefly before getting into the results proper.
Scanner Caching
The HBase API Scan class takes a setting called setCaching(int caching) which tells the scanner how many rows to fetch at a time from the server. This defaults to 1, which is optimal for single gets, but is decidedly suboptimal for big scans like ours. Unfortunately PE doesn't allow for explicitly setting this value on the command line and so I took the advice of the API Javadoc and passed in a value of 1000 for the property hbase.client.scanner.caching in the hbase-site.xml file that forms the configuration for the MapReduce job that is PE. It would seem, though, that this property is not being honoured by HBase and so my initial tests all ran with a cache size of 1, producing poor results. I subsequently hard-coded the setCaching(1000) on the scans themselves in the PE and saw significant improvements. Those differences are visible in the results, below.
Block Caching
When I say block caching I mean HBase blocks - not HDFS blocks. See the HBase guide or Lars George's excellent HBase: The Definitive Guide for many more details. You'll see many recommendations from different sources to turn block caching off during scans but it's not immediately obvious why that should be. It turns out that it has no performance implications for the scan itself, but will significantly impact any random reads (gets) that may be relying on recently loaded blocks. If the scan is using the block cache then it will load the cache with a block, read the block, and then load the next one, ignoring the first one for the rest of the scan. The earlier loaded blocks will be evicted quickly (as new blocks are loaded) leading to "cache churn". So I tested the block cache on and off and concluded that there was no difference in a dedicated scan test (no other activity in the cluster) and so for the rest of my tests I left the block cache on. In order to test this behaviour I again had to hard-code PE with setCacheBlocks(false) (it defaults to true).
The Results
The full spreadsheet is available for download from our google code site. I've just included the summary chart here, because it pretty much tells the story. Of note: the y-axis is Records / second which I calculate as the total records scanned divided by the total time for the job as reported by the JobTracker. This doesn't, therefore, take any of the PE generated per-mapper counts/times into account (which are still somewhat mysterious to me).
Conclusions (within our obviously limited test environment):
- with bigger datasets, more mappers are better; with smaller datasets, somewhat fewer are better
- compression slows things down. This is probably the most surprising (and contentious) finding. We would expect performance to improve with compression (at worst staying the same) because there is much less data to transfer when the files are compressed, and I/O is typically the limiting factor in HBase performance. I kept an eye on the cpus with ganglia during the uncompressed tests and sure enough, they spent approximately double the amount of time in io_wait as in the compressed tests.
- ganglia revealed no obvious bottlenecks during the scan. You can look at our ganglia charts from the past few weeks and apart from a few purposefully degenerate tests there was no obvious component that was limiting things - cpus never went above 60% (io_wait never above 30%), and RAM and disks "looked ok". If anyone has insight here we'd really appreciate it.
- total variability across all these tests was only about 20%. Either all tests are hitting the same, as yet unseen, hardware or configuration limit, or there really is no silver bullet to performance increases - tuning is an incremental thing.
- there are still strange things going on :) I can't explain the fact that in the compressed test performance increases from 50 to 100M records, and then decreases dramatically with 200M records.
We'd love feedback on these results, both in terms of how we can improve our testing and in interpreting the results, as well as any numbers you may have from your clusters when running PE. Is 300k records per second even in the ballpark? It's crazy that these data don't exist out there already, so hopefully this post helps alleviate that somewhat.
The good news is that we'll be getting new cluster hardware soon, and then I think the next step is to load our real occurrences data into a table and run similar scanning tests against it to optimize those scans. Of course then we'll start writing new records at the same time, and then things will get.... interesting. Should be fun :)
Hey Oliver:
ReplyDelete"...and then run a major compaction to encourage data locality. "
FYI, data should be local having just been written. Major compact will make it so fewer storefiles in the mix which should help.
"....of 1000 for the property hbase.client.scanner.caching in the hbase-site.xml file."
Sounds like a bug boss.
"So I tested the block cache on and off and concluded that there was no difference in a dedicated scan test"
Sounds right to me.
" In order to test this behaviour I again had to hard-code PE with setCacheBlocks(false) (it defaults to true)."
Is that the right way round with the true/false? If not, sounds like our api is a little non-intuitive.
"This doesn't, therefore, take any of the PE generated per-mapper counts/times into account (which are still somewhat mysterious to me)."
Which ones are these Oliver?
Compression slowing stuff down is interesting. I don't remember this being the case. Especially when you see things like iowait going up when uncompressed. Did you see corresponding up in cpu when compression enabled?
Is your cluster working hard? Sweating? Whats it blocked on? iowait? 30% is pretty high for iowait (I should look at the ganglia graphs)
Odd that upping mappers makes no real difference. This is concurrent maps? It looks like there is a bottleneck. I'd think that throughput would go distinctly up w/ more maps (or down because contending)
So each regionserver is doing about 100k records a second? What you need it to be Oliver?
Thanks for putting together this blog
Let me look at the ganglia graphs later
@Stack:
Delete- data locality: I assumed as much about recent writes but just wanted to be 100% sure that we were starting as well as we could
- scanner caching bug: would like to do a bit more testing to see what's actually happening, but then will report the bug
- setCacheBlocks: as I've written it is correct, but "true" means it'll use whatever the table or column family have set (which in turn defaults to true). I think the API is correct like this though in that most (almost all?) cases you want that cache, and I think the wording is fine ("should I cache blocks?" yes)
- confusing PE numbers: there's only one number really and that's the total time that PE reports at the end of the run. From the source it looks like each mapper adds its total run time to that counter so what you see at the end is the total time spent "mapping". I guess that could be used to determine the mappers' throughput but that doesn't seem useful to me. I've dutifully recorded them in the spreadsheet but basically didn't use them.
- compression: yes cpu went up when iowait went up, but the cpus still weren't pegged
- cluster sweating? that's the rub - I think most people would look at our ganglia charts and say the cluster is basically idle - cpus maxing at 60%, disk io never getting that high, no swapping, network not capping... We're pretty new at reading the ganglia graphs so we can't see a bottleneck - if you could look at them and let us know what you see, that'd be much appreciated!
- # mappers: they are concurrent so when each server has 20 mappers there are 60 running concurrently. i agree that more mappers should mean more throughput, especially when the cpu usage goes up with each increase in mappers.
- 100k/server? That's what it looks like to me. In terms of our Download functionality we figure that needs to be 10 minutes or less in order to be useful. Right now we're at about 325M records, and at 300k/sec that's about 18 minutes, so we'd need roughly double the performance we see right now. And that 325M grows every day, so really it should probably be more like triple (so 300k/server). We're getting some new machines in the next few months, but it would still be great to understand what is really going on here so that we can get the most out of the new machines.
Thanks a lot for taking the time!
Do you have any data from iostat? I couldn't find that in ganglia. Also, can you describe your network?
ReplyDeleteHi and thanks for your comment.
DeleteAs far as I know we don't have any data from iostat. Any suggestions on what exactly to look at there? I might try to see if we can add some disk I/O stats into Ganglia, there are some Python modules available.
I've asked one of our sysadmins and this is what he had to say about the network "All cluster nodes have direct 1G connections to a stack of 3 x 3750 switches." As far as I know these are a bit older. My knowledge about network issues and especially equipment is very limited so any hints would be much appreciated. These switches also serve all our other servers as we don't have a dedicated Hadoop rack with a ToR switch or anything like that. That said the network load throughout our network should be (very subjective, I know) "not that high".
As far as iostat data, I'd be interested in r/s w/s on a per disk basis along with the service and wait times. That gives you a much better idea of whether or not the disk subsystem is struggling.
DeleteThe only reason I asked about the network is that I see the c2n* nodes getting up to 70MB/s each which comes out to 560Mbps. Depending on what else is going on, you could be overrunning a gige link somewhere.
hello!
ReplyDeleteI would like to ask you how did you set up the number of mappers that you have specified (number of mappers per server (8, 10, 12, 14, 20 ). Have you created a CustomInputFormat and overridden the getSplits method and specified these numbers?
Thanks.
Regards,
Florin
Hi Florin - I set the number of mappers on the slaves themselves, using mapred.tasktracker.map.tasks.maximum in core-site.xml, and then restarted the tasktrackers. So that's completely outside the m/r job and then the job would take whatever mappers it could get.
DeleteWhat were your spills like in Map tasks? In compressed data sets you spill more often as the io sort buffer fills much more quickly, each set has more data thus filling the uncompressed buffer more quickly, not sure how much this affects HBase, its a significant problem in Hadoop jobs, dramatically increasing io.sort.mb helps this a lot.
ReplyDeleteThanks for sharing this good info on Snappy.
ReplyDelete