Tuesday, May 10, 2016

Workflow of a MapReduce Job



How MapReduce Works
Rob Kerr
_____________

_____________

Anatomy of a MapReduce Job Run

In Hadoop 0.23.0 a new MapReduce implementation was introduced. The new implementation
(called MapReduce 2) is built on a system called YARN.

YARN (MapReduce 2)
In 2010 a group at Yahoo! began to design the next generation of MapReduce. The result was YARN,
short for Yet Another Resource Negotiator (or  YARN Application Resource Negotiator)

YARN meets the scalability shortcomings of “classic” MapReduce by splitting the responsibilities
of the jobtracker into separate entities. The jobtracker takes care of both job scheduling (matching tasks with tasktrackers) and task progress monitoring (keeping track of tasks and restarting failed or slow tasks, and doing task bookkeeping such as maintaining counter totals).

YARN separates these two roles into two independent daemons: a resource manager
to manage the use of resources across the cluster, and an application master to manage
the lifecycle of applications running on the cluster. The idea is that an application
master negotiates with the resource manager for cluster resources—described in terms
of a number of containers each with a certain memory limit—then runs applicationspecific
processes in those containers. The containers are overseen by node managers running on cluster nodes, which ensure that the application does not use more resources
than it has been allocated.

YARN is more general than MapReduce, and in fact MapReduce is just one type of YARN application. There are a few other YARN applications—such as a distributed shell that can run a script on a set of nodes in the cluster—and others are actively being worked on (some are listed at http://wiki.apache.org/hadoop/Powered ByYarn).


Furthermore, it is even possible for users to run different versions of MapReduce on
the same YARN cluster, which makes the process of upgrading MapReduce more
managable.

Classic MapReduce (MapReduce 1)
At the highest level, there are four independent entities:

• The client, which submits the MapReduce job.


• The jobtracker, which coordinates the job run. The jobtracker is a Java application
whose main class is JobTracker.
• The tasktrackers, which run the tasks that the job has been split into. Tasktrackers
are Java applications whose main class is TaskTracker.
• The distributed filesystem (normally HDFS, covered in Chapter 3), which is used
for sharing job files between the other entities.

Job Completion
When the jobtracker receives a notification that the last task for a job is complete (this will be the special job cleanup task), it changes the status for the job to “successful.” Then, when the Job polls for status, it learns that the job has completed successfully, so it prints a message to tell the user and then returns from the waitForCompletion() method.

MapReduce on YARN involves more entities than classic MapReduce. They are:
• The client, which submits the MapReduce job.

When a client submits an application, several kinds of information are provided to the YARN infrastucture. In particular:

a configuration: this may be partial (some parameters are not specified by the user) and in this case the default values are used for the job. Notice that these default values may be the ones chosen by a Hadoop provider like Amazon.
a JAR containing:
a map() implementation
a combiner implementation
a reduce() implementation
input and output information:
input directory: is the input directory on HDFS? On S3? How many files?
output directory: where will we store the output? On HDFS? On S3?
(http://ercoppa.github.io/HadoopInternals/AnatomyMapReduceJob.html)


• The YARN resource manager, which coordinates the allocation of compute resources
on the cluster.
• The YARN node managers, which launch and monitor the compute containers on
machines in the cluster.
• The MapReduce application master, which coordinates the tasks running the MapReduce job. The application master and the MapReduce tasks run in containers that are scheduled by the resource manager, and managed by the node
managers.
• The distributed filesystem (normally HDFS), which is used for sharing job files between the other entities.
The process of running a job is  described in the following sections.


Job Submission
Jobs are submitted in MapReduce 2 using the same user API as MapReduce 1.
MapReduce 2 has an implementation of ClientProtocol that is activated when mapre
duce.framework.name is set to yarn. The new job ID is retrieved from the resource manager.
The job client checks the output specification of the job; computes input splits  and copies
job resources (including the job JAR, configuration, and split information) to HDFS
Finally, the job is submitted by calling submitApplication() on the resource manager.


Job Initialization
When the resource manager receives a call to its submitApplication(), it hands off the
request to the scheduler. The scheduler allocates a container, and the resource manager
then launches the application master’s process there, under the node manager’s management
The application master for MapReduce jobs is a Java application whose main class is
MRAppMaster. It initializes the job by creating a number of bookkeeping objects to keep
track of the job’s progress, as it will receive progress and completion reports from the
tasks. Next, it retrieves the input splits computed in the client from the shared
filesystem. It then creates a map task object for each split, and a number of  reduce task objects determined by the mapreduce.job.reduces property.

The next thing the application master does is decide how to run the tasks that make up the MapReduce job. If the job is small, the application master may choose to run them in the same JVM as itself, since it judges the overhead of allocating new containers and running tasks in them as outweighing the gain to be had in running them in parallel, compared to running them sequentially on one node.  Such a job is said to be uberized, or run as an uber task.

What qualifies as a small job? By default one that has less than 10 mappers, only one
reducer, and the input size is less than the size of one HDFS block.
Before any tasks can be run the job setup method is called (for the job’s OutputCommit
ter), to create the job’s output directory.

Task Assignment
If the job does not qualify for running as an uber task, then the application master
requests containers for all the map and reduce tasks in the job from the resource manager
. Each request, which are piggybacked on heartbeat calls, includes information about each map task’s data locality, in particular the hosts and corresponding racks that the input split resides on. The scheduler uses this information to make scheduling decisions (just like a jobtracker’s scheduler does): it attempts to place tasks on data-local nodes in the ideal case, but if this is not possible the scheduler prefers rack-local placement to non-local placement.
Requests also specify memory requirements for tasks. By default both map and reduce
tasks are allocated 1024 MB of memory, but this is configurable by setting mapre
duce.map.memory.mb and mapreduce.reduce.memory.mb.


In YARN, resources are fine-grained. In particular, applications may request a memory capability that is anywhere between the minimum allocation and a maximum allocation, and which must be a multiple of the minimum allocation. Default memory allocations are scheduler-specific, and for the
capacity scheduler the default minimum is 1024 MB (set by yarn.scheduler.capacity.minimum-allocation-mb), and the default maximum is 10240 MB (set by yarn.scheduler.capacity.maximum-allocation-mb). Thus, tasks can request any memory allocation between 1 and 10 GB (inclusive), in multiples of 1 GB (the scheduler will round to the nearest multiple if needed), by setting mapreduce.map.memory.mb and map reduce.reduce.memory.mb appropriately.

Task Execution
Once a task has been assigned a container by the resource manager’s scheduler, the
application master starts the container by contacting the node manager. The task is executed by a Java application whose main class is YarnChild. Before it can run the task it localizes the resources that the task needs, including the job configuration and JAR file, and any files from the distributed cache . Finally, it runs the map or reduce task.

The YarnChild runs in a dedicated JVM, for the same reason that tasktrackers spawn
new JVMs for tasks in MapReduce 1: to isolate user code from long-running system
daemons. Unlike MapReduce 1, however, YARN does not support JVM reuse so each
task runs in a new JVM.

 The Yarn Child launches the Streaming or Pipes process and communicates with it using standard
input/output or a socket (respectively).

Progress and Status Updates
When running under YARN, the task reports its progress and status (including counters) back to its application master every three seconds (over the umbilical interface), which has an aggregate view of the job.

The client polls the application master every second (set via mapreduce.client.progressmonitor.pollinterval) to receive progress updates, which are usually displayed to the user.

Job Completion
As well as polling the application master for progress, every five seconds the client checks whether the job has completed when using the waitForCompletion() method on Job. The polling interval can be set via the mapreduce.client.completion.pollinterval configuration property.

On job completion the application master and the task containers clean up their working state, and the OutputCommitter’s job cleanup method is called. Job information is archived by the job history server to enable later interrogation by users if desired.


Job Scheduling
Early versions of Hadoop had a very simple approach to scheduling users’ jobs: they ran in order of submission, using a FIFO scheduler.  However, with the FIFO scheduler, priorities do not support preemption, so a high-priority job can still be blocked by a long-running low priority job that started before the high-priority job was scheduled.

MapReduce in Hadoop comes with a choice of schedulers. The default in MapReduce 1 is the original FIFO queue-based scheduler, and there are also multiuser schedulers called the Fair Scheduler and the Capacity Scheduler.

MapReduce 2 comes with the Capacity Scheduler (the default), and the FIFO scheduler.


Excerpts from  Hadoop: The Definitive Guide, Tom White, Pub by O'Reilly

http://blogs.data-flair.com/hadoop-mapreduce-flow-how-data-flows-in-mapreduce/





Hadoop Notes and Video Lectures


What is Hadoop? Text and Video Lectures

What is MapReduce? Text and Video Lectures

The Hadoop Distributed Filesystem (HDFS)

Hadoop Input - Output System




No comments:

Post a Comment