In the last few months I was tasked several times with setting up Hadoop clusters. Those weren't huge - two to thirteen machines - but from what I read and hear this is a common use case especially for companies just starting with Hadoop or setting up a first small test cluster. While there is a huge amount of documentation in form of official documentation, blog posts, articles and books most of it stops just where it gets interesting: Dealing with all the stuff you really have to do to set up a cluster, cleaning logs, maintaining the system, knowing what and how to tune etc.
I'll try to describe all the hoops we had to jump through and all the steps involved to get our Hadoop cluster up and running. Probably trivial stuff for experienced Sysadmins but if you're a Developer and finding yourself in the "Devops" role all of a sudden I hope it is useful to you.
While working at GBIF I was asked to set up a Hadoop cluster on 15 existing and 3 new machines. So the first interesting thing about this setup is that it is a heterogeneous environment: Three different configurations at the moment. This is where our first goal came from: We wanted some kind of automated configuration management. We needed to try different cluster configurations and we need to be able to shift roles around the cluster without having to do a lot of manual work on each machine. We decided to use a tool called Puppet for this task.
While Hadoop is not currently in production at GBIF there are mid- to long-term plans to switch parts of our infrastructure to various components of the HStack. Namely MapReduce jobs with Hive and perhaps Pig (there is already strong knowledge of SQL here) and also storing of large amounts of raw data in HBase to be processed asynchronously (~500 million records until next year) and indexed in a Lucene/Solr solution possibly using something like Katta to distribute indexes. For good measure we also have fairly complex geographic calculations and map-tile rendering that could be done on Hadoop. So we have those 18 machines and no real clue how they'll be used and which services we'd need in the end.
Environment
As mentioned before we have three different server configurations. We've put those machines in three logical clusters c1, c2 and c3 and just counting up in those (our master for example is currently running on c1n1):
- c1 10: Intel(R) Xeon(R) CPU X3363 @ 2.83GHz, 2x6MB (quad), 8 GB RAM, 2 x 500GB SATA 7.2K
- c2 3: 2 x Intel(R) Xeon(R) CPU E5630 @ 2.53GHz (quad), 24 GB RAM, 6 x 250 GB SATA 5.4K
- c3 5: Intel(R) Xeon(R) CPU X3220 @ 2.40GHz (quad), 4 GB RAM, 2 x 160 GB SATA 7.2K
- CentOS 5.5
- The machines are in different racks but connected to only one switch
Goal
These were the goals we set out to achieve on our cluster and these are also all the things I'll try to describe in this or a following post:
- Puppet for setting up the services and configuring machine state
- CDH3 (Beta 3)
- Hadoop HDFS + MapReduce incl. Hadoop LZO
- Hue
- Zookeeper
- HBase
- Easily distributable packages for Hadoop, Hive and Pig to be used by the employees to access the cluster from their own workstations
- Benchmarks & Optimizations
Manual Installation
Before we use Puppet to do everything automatically I will show how it can be done manually. I think it is important to know all the steps in case something goes wrong or you decide not to use Puppet at all. When I talk about "the server" I always mean "all servers in your cluster" except when noted otherwise. I highly recommend not skipping this part even if you want to use Puppet.
Operating System
For now I'll just assume a vanilla CentOS 5.5 installation. There's nothing special you need. I recommend just the bare minimum, everything else needed can be installed at a later time. A few words though about things you might want to do: Your servers probably have multiple disks. You shouldn't use any RAID or LVM on any of your slaves (i.e. DataNodes/TaskTracker). Just use a JBOD configuration. In our cluster all disks are in a simple structure:
/mnt/disk1
/mnt/disk2
- ...
- Mount your data disks with
noatime
(e.g./dev/sdc1 /mnt/disk3 ext3 defaults,noatime 1 2
which btw. impliesnodiratime
) - By default there are a certain number of blocks reserved on ext (not familiar with others) file systems (check by running
tune2fs -l /dev/sdc1
and look at the Reserved block count). While this is useful on system disks so that critical processes can still write some data when the disk is full otherwise this is wasted space on our data disks. By default 5% of a HDD are reserved for this. I recommend setting this down to 1% by running:tune2fs -m 1 <device>
on all your data disks (i.e.tune2fs -m 1 /dev/sdc1
) which frees up quite a bit of disk space. You can also set it to 0% if you want though I went with 1% for our cluster. Keep the default setting for your system disks though!
A note on Cloudera's Package system & naming
Cloudera provides the various components of Hadoop in different Packages but they follow a simple structure: There is one
hadoop-0.20
package which contains all the jars, config files, directories, etc. needed for all the roles. And then there are packages like hadoop-0.20-namenode
which are only a few kilobytes and they only contain the appropriate start- and stopscripts for the role in question.
1. Common Requirements
Most of the commands in this guide need to be executed as
root
. I've chosen the easy route here and just logged in as root
. If you're operating as a non-privileged user remember to use su
, sudo
or any other means to ensure you have the proper rights.
Repository As all the packages we're going to install are provided by Cloudera we need to add their repository to our cluster:
curl http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo > /etc/yum.repos.d/cloudera-cdh3.repoJava installation
- Cloudera documentation
- Java downloads
- We're using JDK 6 Update 23
-rpm.bin
(i.e. jdk-6u23-linux-x64-rpm.bin
). You might have to do this from a client machine because you need a browser that works with the Oracle site. So on any one machine execute the following:
unzip jdk-6u23-linux-x64-rpm.binYou should now have a bunch of .rpm files but you only need one of them:
jdk-6u23-linux-amd64.rpm
. Copy this file to your servers and install it as root using rpm:
rpm -Uvh ./jdk-6u23-linux-amd64.rpmTime
While not a hard requirement it makes a lot of things easier if the clocks on your servers are synchronized. I added this part at the last minute because we just realized that
ntpd
was disabled on three of our machines (c2) by accident and had some problems with it. It is worth taking a look at the clocks now and set up ntp
properly before you start.
DNS
It doesn't matter if you use a DNS server or hosts files or any other means for the servers to find each other. But make sure this works! Do it now! Even if you think everything's set up correctly. Another thing that you should check is if the local hostname resolves to the public IP address. If you're using a DNS server you can use
dig
to test this but that doesn't take into account the /etc/hosts
file so here is a simple test to see if it is correct:
ping -c 1 `hostname`This should resolve to the public IP and not to
127.0.0.1
.
Firewall
Hadoop uses a lot of ports for its internal and external communications. We've just allowed all traffic between the servers in the cluster and clients. But if you don't want to do that you can also selectively open the required ports. I try to mention them but they can all be changed in the configuration files. I might also miss some due to our config so I'd be glad if someone could point those out to me.
Packages
We're going to use lzo compression, the Hadoop native libraries as well as hue so there are a few common dependencies on all machines in the cluster which can be easily installed:
rpm -Uvh http://download.fedora.redhat.com/pub/epel/5/x86_64/epel-release-5-4.noarch.rpm yum install -y lzo hue-plugins hadoop-0.20-nativeDirectories
We also need some directories later on so we can just create them now:
mkdir <data disk>/hadoop chown root:hadoop <data disk>/hadoopCloudera uses the alternatives system to manage configuration. In
/etc/hadoop/conf
is the currently activated configuration. Look at the contents of /etc/hadoop
and you'll find all the installed configurations. At the moment there is only a conf.empty
directory which we'll use as our starting point:
cp -R /etc/hadoop/conf.empty /etc/hadoop/conf.clusterNow feel free to edit the configuration files in
/etc/hadoop/conf.cluster
but we'll go through them as well later in this post. The last step is to activate this configuration:
/usr/sbin/alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf /etc/hadoop-0.20/conf.cluster 50LZO
- hadoop-gpl-compression project
- Todd Lipcon's hadoop-lzo & Kevin Weil's hadoop-lzo projects
rpm -Uvh http://download.fedora.redhat.com/pub/epel/5/x86_64/epel-release-5-4.noarch.rpm yum install -y lzo-devel wget --no-check-certificate https://github.com/toddlipcon/hadoop-lzo/tarball/0.4.9 tar xvfz toddlipcon-hadoop-lzo-0.4.9-0-g0e70051.tar.gz wget http://www.apache.org/dist/ant/binaries/apache-ant-1.8.2-bin.tar.bz2 tar jxvf apache-ant-1.8.2-bin.tar.gz cd toddlipcon-hadoop-lzo-0e70051 JAVA_HOME=/usr/java/latest/ BUILD_REVISION="0.4.9" ../apache-ant-1.8.2/bin/ant tarThe ant version that comes with CentOS 5.5 didn't work for me that's why I downloaded a new one. This should leave you with a
hadoop-lzo-0.4.9.tar.gz
file in the build directory which you can extract to get all the necessary files for your servers:
hadoop-lzo-0.4.9.jar
needs to be copied into/usr/lib/hadoop/lib
on each serverlib/native/Linux-amd64-64
needs to be copied into/usr/lib/hadoop/lib/native
on each server
We've had a problem with unintentional debug logs filling up our hard drives. The investigations that followed that incident resulted in a blog post by Lars George explaining all the log files Hadoop writes. It is a worthwhile read. Hadoop writes tons of logs in various processes and phases and you should make sure that these don't fill up your hard drives. There are two instances in the current CDH3b3 where you have to manually interfere:
- Hadoop daemon logs
- Job XML files on the JobTracker
DailyRollingFileAppender
which unfortunately doesn't have a maxBackupIndex
setting like the RollingFileAppender
. So either change the appender or manually clean up logs after a few days. We chose the second path and added a very simple cron job to run daily:
find /var/log/hadoop/ -type f -mtime +14 -name "hadoop-hadoop-*" -deleteThis jobs deletes old log files after 14 days. We'll take care of the Job XML files in a similar way at the JobTracker.
HDFS
One property needs to be set for both the NameNode and the DataNodes in the file
/etc/hadoop/conf/core-site.xml
: fs.default.name
. So just add this and replace $namenode
with the IP or name of your NameNode:
<property> <name>fs.default.name</name> <value>hdfs://$namenode:8020</value> </property>
2.1. NameNode
Installing the NameNode is straightforward:
yum install -y hadoop-0.20-namenodeThis installs the startup scripts for the NameNode. The core package was already installed in the previous step. Now we need to change the configuration, create some directories and format the NameNode.
In
/etc/hadoop/conf/hdfs-site.xml
add the dfs.name.dir
property which "determines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy." We mentioned before that we're using a JBOD configuration. We do this even for our NameNode. So in our case the NameNode has two disks mounted at /mnt/disk1
and /mnt/disk2
but you might want to write to just one location if you use RAID. As it says in the documentation the NameNode will write to each of the locations. You can write to a third location: A NFS mount which serves as a backup. Our configuration looks like this:
<property> <name>dfs.name.dir</name> <value>/mnt/disk1/hadoop/dfs/name,/mnt/disk2/hadoop/dfs/name</value> </property>Make sure to create the
dfs
directories before starting the NameNode. They need to belong to hdfs:hadoop
. Formatting the NameNode is all that's left:
su hdfs -c "/usr/bin/hadoop namenode -format"Once you've done all that you can enable the service so it will be started upon system boot and start the NameNode:
chkconfig hadoop-0.20-namenode on service hadoop-0.20-namenode startYou should be able to see the web interface on your namenode at port 50070 now. Ports that need to be opened to clients on the NameNode are 50070 (web interface, 50470 if you enabled SSL) and 8020 (for HDFS command line interaction). Only port 8020 needs to be enabled for all other servers in the cluster.
We also use a cron job to run the HDFS Balancer every evening:
/usr/lib/hadoop-0.20/bin/start-balancer.sh -threshold 5
2.2 DataNodes
The DataNodes handle all the data by storing it and serving it to clients. You can run a DataNode on your NameNode and especially for small- or test clusters this is often done but as soon as you have more than three to five machines or rely on your cluster for production use you should use a dedicated NameNode. Setting the DataNodes up is easy though after all our preparations. We need to set the property
dfs.data.dir
in the file /etc/hadoop/conf/hdfs-site.xml
. It "determines where on the local filesystem an DFS data node should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices. Directories that do not exist are ignored." These are the directories where the real data bytes of HDFS will be written to. If you specify multiple directories the DataNode will write to them in turn which gives good performance when reading the data.This is an example of what we are using:
<property> <name>dfs.data.dir</name> <value>/mnt/disk1/hadoop/dfs/data,/mnt/disk2/hadoop/dfs/data</value> </property>Make sure to create the
dfs
directories before starting the DataNodes. They need to belong to hdfs:hadoop
. When that's done you just need to install the DataNode, activate the startup scripts and start it:
yum install -y hadoop-0.20-datanode chkconfig hadoop-0.20-datanode on service hadoop-0.20-datanode startYour DataNode should be up and running and if you have configured it correctly should also have connected to the NameNode and be visible in the web interface in the Live Nodes list and the configured capacity should go up. Ports that need to be opened to clients are 50075 (web interface, 50475 if you enabled SSL) and 50010 (for data transfer). For the cluster you need to open ports 50010 and 50020.
3. MapReduce
MapReduce is split in two parts as well: A JobTracker and multiple TaskTrackers. For small-ish clusters the NameNode and the JobTracker can run on the same server but depending on your usage and available memory you might need to run them on separate servers. We have 18 servers, 17 slaves and 1 master (with NameNode, JobTracker and other services) which isn't a problem so far. We need three properties set on all servers (in
mapred-site.xml
) to get started.
mapred.job.tracker
: "The host and port that the MapReduce job tracker runs at. If 'local', then jobs are run in-process as a single map and reduce task."- This just points to your JobTracker. There is no default port for this in Hadoop 0.20 but 8021 is often used.
- Our value (replace
$jobtracker
with the name or IP of your designated JobTracker):$jobtracker:8021
mapred.local.dir
: "The local directory where MapReduce stores intermediate data files. May be a comma-separated list of directories on different devices in order to spread disk i/o. Directories that do not exist are ignored."- As it says this is a local directory where MapReduce stores stuff an we spread it out over all our discs.
- Our value:
/mnt/disk1/hadoop/mapreduce,/mnt/disk2/hadoop/mapreduce
- Create the directories on each server with the owner
mapred:hadoop
mapred.system.dir
: "The shared directory where MapReduce stores control files."- This is a path in HDFS where MapReduce stores stuff
- Our value:
/hadoop/mapreduce/system
- If
dfs.permissions
are on you need to create this directory in HDFS. Execute this command on any server in your cluster:su hdfs -c "/usr/bin/hadoop fs -mkdir /hadoop/mapreduce && /usr/bin/hadoop fs -chown mapred:hadoop /hadoop/mapreduce"
3.1 JobTracker
The JobTracker is very easy to setup and start:
yum install -y hadoop-0.20-jobtracker chkconfig hadoop-0.20-jobtracker on service hadoop-0.20-jobtracker startThe web interface should now be available at port 50030 on your JobTracker. Ports 50030 (web interface) and 8021 (not well defined in Hadoop 0.20 but if you followed my configuration this is correct) need to be opened to clients. Only 8021 is necessary for the TaskTrackers.
If the JobTracker is restarted some old files will not be cleaned up. That's why we added another small cronjob to run daily:
find /var/log/hadoop/ -type f -mtime +3 -name "job_*_conf.xml" -delete
3.2 TaskTracker
The TaskTracker are as easy to install as the JobTracker:
yum install -y hadoop-0.20-tasktracker chkconfig hadoop-0.20-tasktracker on service hadoop-0.20-tasktracker startThe TaskTracker should now be up and running and visible in the JobTracker's Nodes list. Only port 50060 needs to be opened to clients for a minimalistic web interface. Other than that no other ports are needed as TaskTrackers check in at the JobTracker regularly (heartbeat) and get assigned Tasks at the same time.
4. Configuration
I'll discuss a few configuration properties here that in a range of "necessary to change" to "nice to know about". I'll mention the following things for each property:
- The default value,
- the value we use for our cluster at GBIF if it differs from the default,
- some of the defaults are quite old and have never been changed so I might mention a value I deem safe to use for everybody,
- if we set the property to final so it can't be overridden by clients (we set a lot of the parameters to final for purely documentary reasons, even those that can't be overwritten in the first place),
- if the property has been renamed or deprecated in Hadoop 0.21,
- and if this property is required in a client configuration file or only on the cluster, if I don't mention it it's not needed.
And I know that there are some duplications to the section above but I want to keep this Configuration section as a reference.
core-site.xml
fs.default.name
- Default:
file:///
- We:
hdfs://$namenode:8020
- We set this to final
- Renamed to
fs.defaultFS
in Hadoop 0.21 - Needed on the clients
hadoop.tmp.dir
- Default:
/tmp/hadoop-${user.name}
- CDH3 Default:
/var/lib/hadoop-0.20/cache/${user.name}
- We: Left it at the CDH3 default
- We set this to final
fs.trash.interval
- Default:
0
- We:
10080
- We set this to final
fs.checkpoint.dir
- Default:
${hadoop.tmp.dir}/dfs/namesecondary
- We:
/mnt/disk1/hadoop/dfs/namesecondary,/mnt/disk2/hadoop/dfs/namesecondary
- We set this to final
io.file.buffer.size
- Default:
4096
- Safe:
65536
- We:
131072
(32 * 4096) - Can be overwritten by clients
- We set this to final
io.compression.codecs
- Default:
org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec
- We:
org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec
- We set this to final
LzoCodec
and LzopCodec
.
io.compression.codec.lzo.class
- We:
com.hadoop.compression.lzo.LzopCodec
webinterface.private.actions
- Default:
false
- We:
true
- We set this to final
true
the web interfaces for the JobTracker and NameNode gain some advanced options like killing a job. It makes life a lot easier while still in development or evaluation. But you probably should set this to false once you rely on your Hadoop cluster for production use.
hdfs-site.xml
dfs.name.dir
- Default:
${hadoop.tmp.dir}/dfs/name
- We:
/mnt/disk1/hadoop/dfs/name,/mnt/disk2/hadoop/dfs/name
- We set this to final
dfs.data.dir
- Default:
${hadoop.tmp.dir}/dfs/data
- We:
/mnt/disk1/hadoop/dfs/data,/mnt/disk2/hadoop/dfs/data
- We set this to final
dfs.name.dir
in that the data is not replicated to all disks but distributed among all those locations. The DataNodes save the actual data in these locations. So more space is better. The easiest thing is to use dedicated disks for this. If you save other stuff than Hadoop data on the disks make sure to set dfs.datanode.du.reserved
(see below).
dfs.namenode.handler.count
- Default:
10
- We:
20
- Safe: 10-20
- We set this to final
nnbench
is probably a good tool to benchmark this. If you've got a large cluster or many file operations (create or delete) you can try upping this value.
dfs.datanode.handler.count
- Default:
3
- We:
5
- Safe: 5-10
- We set this to final
TestDFSIO
benchmark seems like a good test to run to find a good value here. Just play around. We've tried a bunch of different values up to 20 and didn't see a difference so we chose a value slightly larger than the default.
dfs.datanode.du.reserved
- Default:
0
- We: Left the default
dfs.data.dir
). As our drives are dedicated to Hadoop we left this at 0 but if the drives host other stuff as well set this to an appropriate value.
dfs.permissions
- Default:
true
- We:
true
- Renamed to
dfs.permissions.enabled
in Hadoop 0.21 - We set this to final
dfs.replication
- Default:
3
- We:
3
- Can be used in the client configuration
dfs.block.size
- Default:
67108864
- We:
134217728
- Renamed to
dfs.blocksize
in Hadoop 0.21 - Can be used in the client configuration
This can be set on a per file basis so you really have to find your own perfect value, perhaps even on a per dataset basis.
dfs.balance.bandwidthPerSec
- Default:
1048576
- We:
2097152
- Renamed to
dfs.datanode.balance.bandwidthPerSec
in Hadoop 0.21 - We set this to final
dfs.hosts
- Default: no default set
- We:
/etc/hadoop/conf/allowed_hosts
- We set this to final
dfs.support.append
- Default:
false
- We:
true
- As far as I know this option has been removed in Hadoop 0.21 and is enabled by default
- We set this to final
dfs.datanode.max.xcievers
- Default:
256
- Safe:
1024
- We:
2048
- Yes, this is misspelt in Hadoop and it hasn't been fixed in Hadoop 0.21.
- We set this to final
mapred-site.xml
HDFS is pretty straightforward to configure and benchmark. MapReduce is more of a black art unfortunately. I'll describe the MapReduce process here because it is important to understand where all the properties come in so you can safely change their values and tweak the performance. In my first draft of this post I wrote that I won't go into much detail on the internals of the MapReduce process. (Un-)fortunately this wasn't as easy as I thought and it has grown into a full blown explanation of everything I know. It is very possible that something's wrong here so please correct me if you see something that is off. And if you're not interested in how this works just skip to the descriptions of the properties itself.All of this is valid for Hadoop 0.20.2+737 (the CDH version). I know that some things have changed in Hadoop 0.21 but that's left for another time.
The Map side
While a Map is running it is collecting output records in an in-memory buffer calledMapOutputBuffer
, if there are no reducers a DirectMapOutputCollector
is used which makes most of the rest obsolete as it writes immediately to disk. The total size of this in memory buffer is set by the io.sort.mb
property and defaults to 100 MB (which is converted to a byte value using a bit shift operation [100 << 20 = 104857600
]). Out of these 100 MB io.sort.record.percent
are reserved for tracking record boundaries. This property defaults to 0.05 (i.e. 5% which means 5 MB in the default case). Each record to track takes 16 bytes (4 integers of 4 bytes each) of memory which means the buffer can track 327680 map output records with the default settings. The rest of the memory (104857600 bytes - (16 bytes * 327680) = 99614720 bytes) is used to store the actual bytes to be collected (in the default case this will be 95 MB). While Map outputs are collected they are stored in the remaining memory and their location in the in-memory buffer is tracked as well. Once one of these two buffers reaches a threshold specified by io.sort.spill.percent
, which defaults to 0.8 (i.e. 80%), the buffer is flushed to disk:
0.8 * 99614720 = 79691776 0.8 * 327680 = 262144Look in the log output of your Maps and you'll see these three lines at the beginning of every log:
2010-12-05 01:33:04,912 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100 2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720 2010-12-05 01:33:04,996 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680You should recognize these numbers!
Now while the Map is running you might see log lines like these:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true 2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 19361312; bufvoid = 99614720 2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; kvend = 262144; length = 327680 2010-12-05 01:33:09,558 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0This means we've reached the maximum number of records we can track even though our buffer is still pretty empty (99614720 - 19361312 bytes still free). If however your buffer is the cause of your spill you'll see a line like this:
2010-12-05 01:33:08,823 INFO org.apache.hadoop.mapred.MapTask: Spilling map output: buffer full = trueAll of this spilling to disk is done in a separate thread so that the Map can continue running. That's also the reason why the spill begins early (when the buffer is only 80% full) so it doesn't fill up before a spill is finished. If one single Map output is too large to fit into the in memory buffer a single spill is done for this one value. A spill actually consists of one file per partition, meaning one file per Reducer.
After a Map task has finished there may be multiple spills on the TaskTracker. Those files have to be merged into one single sorted file per partition which is then fetched by the Reducers. The property
io.sort.factor
says how many of those spill files will be merged into one file at a time. The lower the number is the more passes will be required to arrive at the goal. The default is very low and it was considered to set the default to 100 (and in fact looking at the code it sometimes is set to 100 by default). This property can make a pretty huge difference if your Mappers output a lot of data. Not much memory is needed for this property but the larger it is the more open files there will be so make sure to set this to a reasonable value. To find such a value you should run a few MapReduce jobs that you'd expect to see in production use and carefully monitor the log files.Watch out for log messages like these:
Merging <numSegments> sorted segments
Down to the last merge-pass, with <numSegments> segments left of total size: <totalBytes> bytes
Merging <segmentsToMerge.size()> intermediate segments out of a total of <totalSegments>
Finished spill 0
" but none of the above you're only producing one spill file which doesn't require any merging or further sorting. This is the ideal situation and you should try to get the number of spilled records/files as low as possible.
The Reduce side
The reduce phase has three different steps: Copy, Sort (which should really be called Merge) and Reduce.During the Copy phase the Reducer tries to fetch the output of the Maps from the TaskTrackers and store it on the Reducer either in memory or on disk. The property
mapred.reduce.parallel.copies
(which defaults to 5) defines how many Threads are started per Reduce task to fetch Map output from the TaskTrackers.Here's an example log from the beginning of a Reducer log:
2010-12-05 01:53:03,846 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=334063200, MaxSingleShuffleLimit=83515800 2010-12-05 01:53:03,879 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Need another 1870 map output(s) where 0 is already in progress 2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging on-disk files 2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread waiting: Thread for merging on-disk files 2010-12-05 01:53:03,880 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for merging in memory files 2010-12-05 01:53:03,881 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Thread started: Thread for polling Map Completion EventsYou can see two things in these log lines. First of all the
ShuffleRamManager
is started and afterwards you see that this Reducer needs to fetch 1870 map outputs (meaning we had 1870 Mappers). The map output is fetched and shuffled into memory (that's what the ShuffleRamManager
is for). You can control its behavior using the mapred.job.shuffle.input.buffer.percent
(default is 0.7). Runtime.getRuntime().maxMemory() is used to get the available memory which unfortunately returns slightly incorrect values so be careful when setting this. We'll get back to the last four lines later.Our child tasks are running with
-Xmx512m
(536870912 bytes) so 70% of that should be 375809638 bytes but the ShuffleRamManager
reports 334063200. No big deal, just be aware of it. There's a hardcoded limit of 25% of the buffer that a single map output may not surpass. If it is larger than that it will be written to disk (see the MaxSingleShuffleLimit value above: 334063200 * 0.25 = 83515800).Now that everything's set up the copiers will start their work and fetch the output. You'll see a bunch of log lines like these:
2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: header: attempt_201012031527_0021_m_000011_0, compressed len: 454055, decompressed len: 454051 2010-12-05 01:53:11,114 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 454051 bytes (454055 raw bytes) into RAM from attempt_201012031527_0021_m_000011_0 2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Read 454051 bytes from map-output for attempt_201012031527_0021_m_000011_0 2010-12-05 01:53:11,133 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_201012031527_0021_m_000011_0 -> (70, 6) from c1n7.gbif.orgIn the first line you see that a map output was successfully copied and it could read the size of the data from the headers. The next line is actually what we've talked about earlier: The map output will now be decompressed (if it was compressed) and saved into memory using the
ShuffleRamManager
. The third line acknowledges that this succeeded. And the last line is information for a bug and should have been removed already according to a comment in the source code.If for whatever reason the map output doesn't fit into memory you will see a similar log line to the second one above but "
RAM
" will be replaced by "Local-FS
" and the fourth line will be missing. You obviously want as much data into memory as possible so shuffling on to the Local-FS is a warning sign or at least a sign for possible optimizations.While all this goes on until all map outputs have been fetched there are two threads (Thread for merging on-disk files and Thread for merging in memory files) waiting for some conditions until they become active. The conditions are as follows:
- The used memory in the in-memory buffer is above
mapred.job.shuffle.merge.percent
(default ist 66%, in our example that would mean 334063200 * 0.66 = 220481712 bytes) and there are at least two map outputs in the buffer - or there are more than
mapred.inmem.merge.threshold
(defaults to 1000) map outputs in the in-memory buffer, independent of the size - or if there are more than
io.sort.factor
* 2 -1 files on disk.
2010-12-05 01:53:42,106 INFO org.apache.hadoop.mapred.ReduceTask: Initiating in-memory merge with 501 segments... 2010-12-05 01:53:42,114 INFO org.apache.hadoop.mapred.Merger: Merging 501 sorted segments ... 2010-12-05 01:53:46,492 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012031527_0021_r_000103_0 Merge of the 501 files in-memory complete. Local file is /mnt/disk1/hadoop/mapreduce/local/taskTracker/lfrancke/jobcache/job_201012031527_0021/attempt_201012031527_0021_r_000103_0/output/map_1.out of size 220545981 2This could actually trigger the third condition as it writes a new file to disk. When that happens you'll see something like this:
2010-12-10 14:28:23,289 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201012101346_0001_r_000012_0We have 19 map outputs on disk. Triggering merge of 10 filesThe
io.sort.factor
was set to the default of 10. 10 (out of the 19) files will be merged into one, leaving 10 on disk (i.e. io.sort.factor
).Both of these (the in-memory and the on-disk merge, the latter is also called Interleaved on-disk merge) will produce a new single output file and write it to disk. All of this is only going on as long as map outputs are still fetched. When that's finished we wait for running merges to finish but won't start any new ones in these threads:
2010-12-05 01:59:10,598 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined. 2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 3 files left. 2010-12-05 01:59:10,599 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 314 files left.As you can see by the timestamps no merges were running in our case so everything just shut down. During the copy phase we finished a total of three in-memory merges that's why we currently have three files on the disk. 314 more map outputs are still in the in-memory buffer. This concludes the Copy phase and the Sort phase begins:
2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Merging 314 sorted segments 2010-12-05 01:59:10,605 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 314 segments left of total size: 127512782 bytes 2010-12-05 01:59:13,903 INFO org.apache.hadoop.mapred.ReduceTask: Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit 2010-12-05 01:59:13,904 INFO org.apache.hadoop.mapred.ReduceTask: Merging 4 files, 788519164 bytes from disk 2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce 2010-12-05 01:59:13,905 INFO org.apache.hadoop.mapred.Merger: Merging 4 sorted segments 2010-12-05 01:59:14,493 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 4 segments left of total size: 788519148 bytesThere are two things happening here. First of all the remaining 314 files that are still in memory are merged into one file on the disk (the first three lines). So now there are four files on the disk. These four files are merged into one.
There is an option
mapred.job.reduce.input.buffer.percent
which is set to 0 by default which allows the Reducer to keep some map output files in memory. The following is a snippet with this property set to 0.7:
2010-12-05 23:11:55,657 INFO org.apache.hadoop.mapred.ReduceTask: Merging 3 files, 661137901 bytes from disk 2010-12-05 23:11:55,660 INFO org.apache.hadoop.mapred.ReduceTask: Merging 312 segments, 127381881 bytes from memory into reduce 2010-12-05 23:11:55,661 INFO org.apache.hadoop.mapred.Merger: Merging 3 sorted segments 2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 661137889 bytes 2010-12-05 23:11:55,688 INFO org.apache.hadoop.mapred.Merger: Merging 313 sorted segments 2010-12-05 23:11:55,689 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 313 segments left of total size: 788519778 bytesYou can see that instead of merging the 312 segments from memory to disk they are kept in memory while the three files on disk are merged into one and all of the resulting 313 segments are streamed into the reducer.
There seems to be a bug in Hadoop though. I'm not 100% sure about this one so any insight would be appreciated. When the following conditions are true segments from the memory don't seem to be written to disk even if they should be according to the configuration:
- There are segments in memory that should be written to disk before the reduce task begins according to
mapred.job.reduce.input.buffer.percent
- and there are more files on disk than
io.sort.factor
2010-12-10 16:39:40,671 INFO org.apache.hadoop.mapred.ReduceTask: Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge 2010-12-10 16:39:40,673 INFO org.apache.hadoop.mapred.ReduceTask: Merging 10 files, 4143441520 bytes from disk 2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce 2010-12-10 16:39:40,674 INFO org.apache.hadoop.mapred.Merger: Merging 24 sorted segments 2010-12-10 16:39:40,859 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 24 segments left of total size: 4143441480 bytesSo the steps being done in the Sort phase are the following:
- Merge all segments (= map outputs) that are still in memory and don't fit into the memory specified by
mapred.job.reduce.input.buffer.percent
into one file on disk if there are less thanio.sort.factor
files on disk so we end up with at mostio.sort.factor
files on the disk after this step. If there are alreadyio.sort.factor
or more files on disk but there are map outputs that need to be written out of memory keep them in memory for now- In the first case you'll see a log message like this:
Merged 314 segments, 127512782 bytes to disk to satisfy reduce memory limit
- In the second case you'll see this:
Keeping 14 segments, 18888592 bytes in memory for intermediate, on-disk merge
- In the first case you'll see a log message like this:
- All files on disk and all remaining files in memory that need to be merged (case 1.b) are determined. You'll see a log message like this: "
Merging 4 files, 788519164 bytes from disk
". - All files that remain in memory during the Reduce phase are determined: "
Merging 312 segments, 127381881 bytes from memory into reduce
". - All files (on disk + in-memory) from step 2. are merged together using
io.sort.factor
as the merge factor. Which means that there might be intermediate merges to disk. - Merge all remaining in-memory (from step 3.) and on-disk files (from step 4.) into one stream to be read by the Reducer. This is done in a streaming fashion without writing new data to disk and just returning an Iterator to the Reduce phase.
Well, it turned out to be a rather detailed description of the process which is helpful to understand the configuration properties available to you. See below for a detailed list of all the relevant properties:
io.sort.factor
- Default:
10
- We:
100
- Safe: 20-100
- Renamed to
mapreduce.task.io.sort.factor
in Hadoop 0.21 - Can be used in the client configuration
io.sort.factor
and io.sort.mb
is not ideal but as long as they are the options we have and the defaults are very low it is pretty safe to change them to a more reasonable value. It is worthwhile to take a look at your logs and search for the lines mentioned in the explanation above. This can be set on a per-job basis and for jobs that run frequently it's worth to find a good job specific value.
io.sort.mb
- Default:
100
- Renamed to
mapreduce.task.io.sort.mb
in Hadoop 0.21 - Can be used in the client configuration
io.sort.record.percent
- Default:
0.05
- This has been removed in favor of automatic configuration in Hadoop 0.21
- Can be used in the client configuration
record full = true
. If this happens try to increase this value. This is another property which is very specific to the jobs you're running so it might need tuning for each and every job.
Thankfully this mechanism has been replaced in Hadoop 0.21.
io.sort.spill.percent
- Default:
0.8
- Renamed to
mapreduce.map.sort.spill.percent
in Hadoop 0.21 - Can be used in the client configuration
mapred.job.tracker
- Default:
local
- We:
<jobtracker>:8021
- We set this to final
- Renamed to
mapreduce.jobtracker.address
in Hadoop 0.21 - Needed on the clients
mapred.local.dir
- Default:
${hadoop.tmp.dir}/mapred/local
- We:
/mnt/disk1/hadoop/mapreduce/local,/mnt/disk2/hadoop/mapreduce/local
- We set this to final
- Renamed to
mapreduce.cluster.local.dir
in Hadoop 0.21
mapred.system.dir
- Default:
${hadoop.tmp.dir}/mapred/system
- We:
/hadoop/mapred/system
- We set this to final
- Renamed to
mapreduce.jobtracker.system.dir
in Hadoop 0.21
defaultFS
where MapReduce stores some control files. In our case that would be a directory in HDFS. If you have dfs.permissions
(which it is by default) enabled make sure that this directory exists and is owned by mapred:hadoop.
mapred.temp.dir
- Default:
${hadoop.tmp.dir}/mapred/temp
- We:
/tmp/mapreduce
- We set this to final
- Renamed to
mapreduce.cluster.temp.dir
in Hadoop 0.21
mapred.map.tasks
- Default:
2
- Renamed to
mapreduce.job.maps
in Hadoop 0.21
mapred.reduce.tasks
- Default:
1
- Renamed to
mapreduce.job.reduces
in Hadoop 0.21
This too can be specified on a per-job basis.
mapred.jobtracker.taskScheduler
- Default:
org.apache.hadoop.mapred.JobQueueTaskScheduler
- We:
org.apache.hadoop.mapred.FairScheduler
- We set this to final
- Renamed to
mapreduce.jobtracker.taskscheduler
in Hadoop 0.21
- JobQueueTaskScheduler
- FairScheduler
- CapacityScheduler
mapred.reduce.parallel.copies
- Default:
5
- We: ~20-50
- We set this to final
- Renamed to
mapreduce.reduce.shuffle.parallelcopies
in Hadoop 0.21
mapred.tasktracker.map.tasks.maximum
& mapred.tasktracker.reduce.tasks.maximum
- Default: 2
- We set this to final
- Renamed to
mapreduce.tasktracker.map.tasks.maximum
&mapreduce.tasktracker.map.tasks.maximum
in Hadoop 0.21
number of cores - 1
. We've tried various settings now but found the load on the servers to be very high with those settings so we'll have to do more benchmarking.
mapred.child.java.opts
- Default:
-Xmx200m
- Can be used in the client configuration
(mapred.tasktracker.map.tasks.maximum + mapred.tasktracker.reduce.tasks.maximum) * Xmx
.
mapred.inmem.merge.threshold
- Default:
1000
- We:
0
- Renamed to
mapreduce.reduce.merge.inmem.threshold
in Hadoop 0.21 - Can be used in the client configuration
mapred.job.shuffle.merge.percent
- Default:
0.66
- Renamed to
mapreduce.reduce.shuffle.merge.percent
in Hadoop 0.21 - Can be used in the client configuration
mapred.inmem.merge.threshold
parameter might actually trigger a merge before this value is hit. We haven't yet played around with this property but you'd have to be careful to turn it not too high so that the copy processes have to wait for the buffer to be empty again. That could be a huge performance hit.An addition to Hadoop's logging would be nice that lets us know how full the buffer is the moment a merge finishes.
mapred.job.shuffle.input.buffer.percent
- Default:
0.7
- Renamed to
mapreduce.reduce.shuffle.input.buffer.percent
in Hadoop 0.21 - Can be used in the client configuration
mapred.job.reduce.input.buffer.percent
- Default:
0.0
- Renamed to
mapreduce.reduce.input.buffer.percent
in Hadoop 0.21 - Can be used in the client configuration
mapred.map.tasks.speculative.execution
& mapred.reduce.tasks.speculative.execution
- Default:
true
- We:
false
- We will set this to final once we're in production
- Renamed to
mapreduce.map.speculative
&mapreduce.reduce.speculative
in Hadoop 0.21 - Can be used in the client configuration
mapred.job.reuse.jvm.num.tasks
- Default:
1
- Renamed to
mapreduce.job.jvm.numtasks
in Hadoop 0.21 - Can be used in the client configuration
-1
a JVM will never be destroyed.
tasktracker.http.threads
- Default:
40
- We:
80
- We set this to final
- Renamed to
mapreduce.tasktracker.http.threads
in Hadoop 0.21
mapred.compress.map.output
- Default:
false
- We:
true
- Renamed to
mapreduce.map.output.compress
in Hadoop 0.21 - Can be used in the client configuration
mapred.map.output.compression.codec
Default:
org.apache.hadoop.io.compress.DefaultCodec - We:
com.hadoop.compression.lzo.LzoCodec
- Renamed to
mapreduce.map.output.compress.codec
in Hadoop 0.21 - Can be used in the client configuration
mapred.hosts
- Default: no default set
- We:
/etc/hadoop/conf/allowed_hosts
- Renamed to
mapreduce.jobtracker.hosts.filename
in Hadoop 0.21 - We set this to final
dfs.hosts
just specifies which TaskTrackers are allowed to get work from the JobTracker. They both have the same format so it's quite common for them to be the same file.
Conclusion
After setting up all these parameters the way you like them you should have a fully functional but basic Hadoop cluster running. You can submit jobs, use HDFS etc. But there are a few more things that we can do like installing Hive, Hue, Pig, Sqoop, etc. We've also yet to cover Puppet. All of this is hopefully forthcoming in more blog posts in the future.
We're also very interested in other users (or interested people and companies) of Hadoop, HBase & Co. in Scandinavia who would be interested in a Hadoop Meetup. We're located in Copenhagen. Contact us if you're interested.
If you have any questions or spot any problems or mistakes please let me know in the comments or by mail.