Friday, May 20, 2011

Hadoop Dont's: What not to do to harvest Hadoop's full potential

We've all heard this story. All was fine until one day your boss heard somewhere that Hadoop and No-SQL are the new black and mandated that the whole company switch over whatever it was doing to the Hadoop et al. technology stack, because that's the only way to get your solution to scale to web proportions while maintaining reliability and efficiency.

So you threw away your old relational database back end and maybe all or part of your middle tier code, bought a couple of books, and after a few days of swearing got your first MapReduce jobs running. But as you finished re-implementing your entire solution, you found that not only is the system way less efficient than the old one, but it's not even scalable or reliable and your meetings are starting more and more to resemble the Hadoop Downfall parody.

So what went wrong?

The problem with Hadoop is that it is relatively easy to get started using it without an in-depth knowledge of what gives it its powers and without this, you are more likely than not to design your solution in a way which takes all of those powers away. So let's take a look at the few key features of Hadoop and what not to do to keep them.

Feature: Efficiency

There are a couple of things Hadoop does to ensure efficiency:
  1. It brings the computation to the data. So instead of sending large amounts of data over the network to the machines which execute the computations, it tries to run the computations directly on the nodes which contain the required inputs.
  2. It processes files sequentially, reducing the number of costly disk seeks. Unless the Hadoop cluster is running on SSD hard drives, it will take it order of 10ms to seek to a new place on the disk. On the other hand during the same amount of time, it can read an order of 10 megabits of data from the disk sequentially. So you can't process large amounts of data if the process involves frequent seeks.
  3. It uses compression, memory buffers and other optimizations to make the data flow in the system very efficient.
Do not: Physically separate your data cluster from your compute cluster. Whether your data is coming from HDFS, HBase, Cassandra or somewhere else, if it is not stored on the same machines as the MapReduce jobs are running on, it is impossible for Hadoop to bring the computation to the data. Cases exist when such a solution is acceptable, in particular when the jobs are more compute intensive than data-intensive (they do a lot of computing and not a whole lot of reading and writing data) and Hadoop can still be a good option for such jobs, but it's full potential is with jobs which process large datasets with relatively simple computations.

Do not: Create location inaware input formats. Often the input formats which come with the data storage you are using do not exactly fit your system and you have to write your own. Make sure that as you do that, you implement location-awareness by implementing a non-empty InputSplit.getLocation() or by inheriting this functionality from a superclass if you are extending an existing input format.

Do not: Read or write external data in the map or reduce tasks (apart from reading the input and writing output as orchestrated by the framework). For example, you may be tempted to make database read or write calls directly in the map or reduce code and there's nothing to prevent you from doing it. But especially if you are doing a lot of small random lookups, you are losing all of the optimization the framework provides for the efficient streaming of data through the system. Also, if you are accessing a limited availability resource, such as a SQL database, you may be introducing a bottleneck which prevents the solution from scaling (see Scalability).

Do not: Write code to copy files around manually. Once again, the Hadoop framework does a lot to optimize the flow of data during a compute job, but copying files around manually will always incur heavy network traffic (as data is replicated) and cannot be optimized since the system does not know what it is you are trying to do. Apart from the input/output mechanism, Hadoop also offers a distributed cache which can be used to bring data to your tasks. Those provided mechanisms are usually the most efficient way of bringing data and computation together.

Feature: Scalability

Hadoop is highly scalable in the sense that you can grow your cluster to many thousands of nodes and the computing throughput increases linearly with it. That's because Hadoop doesn't have bottlenecks. But that doesn't mean that a system implemented on top of Hadoop automatically doesn't have bottlenecks either.

Do not: Synchronize tasks. Since tasks are executing in parallel, it may be tempting to add synchronization between them and with tools such as ZooKeeper it is fairly easy to add all sorts of distributed synchronization mechanisms such as locks or queues. But every synchronization is a bottleneck, so workflows using those almost never scale.

Do not: Use a constant number of mapper or reducer tasks. The number of map tasks a job is split into is proportional to the size of the input, so the scaling of the number of mappers comes naturally as the size of the input grows (as long as you remember about this if implementing a custom input format). But the number of reducers is defined by the programmer. Since the number of reducers also defines the partitioning of the output, it may be tempting to keep it constant or maybe even always keep just one reducer, so that the output is just a single file. But if you do this, you are preventing the system from scaling out as the amount of data and the cluster grows.

Do not: Talk to the job tracker. It may be tempting to have tasks talk directly to the job tracker which scheduled the job to find out extra information about the job or the other tasks. For example, you may be tempted to have the reducer ask the job tracker for the total number input records, in order to turn the classic word count example into an IDF calculation. But the job tracker is a single resource, so such practices could prevent your job from scaling and could also give misleading results, since the job tracker may not always be serving up the latest statistics as the job is still running.

Feature: Reliability

The Hadoop MapReduce framework handles failures gracefully. If a node in the cluster fails mid-task, it will just re-run the task on a different node. It also hedges against having to wait for slower nodes by starting tasks on multiple machines to begin with (what is referred to as "speculative execution"). But if you are not aware of these mechanisms, your system may end up not functioning as expected.

Do not: Write an output format without considering output committer functionality. If using FileSystemOutputFormat or its derivatives, only the output of the successful tasks makes it to the output directory of the job and the rest are discarded. This logic is handled in by the FileOutputCommitter. Some custom formats may not need committers, for example if they are writing to a key-value store which automatically keeps only one value per key, but in other cases, failed and subsequently restarted jobs, or tasks scheduled speculatively, could result in output data inconsistencies, such as duplicate records.

Do not: Write tasks with side effects. If a task has side effects, for example if it writes to an external data store directly, then retried tasks or tasks scheduled concurrently through speculative execution, may end up not functioning as desired.

5 comments:

  1. Fantastic article. Well said!

    ReplyDelete
  2. A nefarious corollary to the 'no side effects' rule: Be *very* careful with random number generation http://blog.rapleaf.com/dev/2009/08/14/using-random-numbers-in-mapreduce-is-dangerous/ -- using a consistent hash is much better.

    ReplyDelete
  3. "constant number of mapper or reducer tasks"- it is difficult to control number of mappers directly but it should be carefully estimated and optimized to reduce the number of serialized/queued cycles. Some parameters that affect the number of mappers are - block size, size of input file, number of slaves etc.
    Regarding number of reducers, having many reducers is usually faster- so consider having a separate additional map-reduce task at the end for merging the output into a single output if required.

    ReplyDelete
  4. typo: "Weather your data is ..."
    feel free to delete this comment after fixing.
    nice post.

    ReplyDelete