MapReduce Online Created by: Rajesh Gadipuuri Modified by: Ying Lu MapReduce Programming Model вЂў Programmers think in a data-centric fashion вЂ“ Apply transformations to data sets вЂў The MR framework handles the Hard Stuff: вЂ“ Fault tolerance вЂ“ Distributed execution, scheduling, concurrency вЂ“ Coordination вЂ“ Network communication MapReduce System Model вЂў Designed for batch-oriented computations over large data sets вЂ“ Each operator runs to completion before producing any output вЂ“ Operator output is written to stable storage вЂў Map output to local disk, reduce output to HDFS вЂў Simple, elegant fault tolerance model: operator restart вЂ“ Critical for large clusters Life Beyond Batch Processing вЂў Can we apply the MR programming model outside batch processing? вЂў Domains of interest: Interactive data analysis вЂў Enabled by high-level MR query languages, e.g. Hive, Pig, Jaql вЂў Batch processing is a poor fit вЂў Batch processing adds massive latency вЂў Requires saving and reloading analysis state MapReduce Online вЂў Pipeline data between operators as it is produced вЂў Hadoop Online Prototype (HOP): Hadoop with pipelining support вЂ“ Preserves the Hadoop interfaces and APIs вЂ“ Challenge: to retain elegant fault tolerance model вЂў Reduces job response time вЂў Enables online aggregation and continuous queries Functionalities Supported by HOP вЂў Reducers begin processing data as soon as it is produced by mappers, they can generate and refine an approximation of their final answer during the course of execution (online aggregation) вЂў HOP can be used to support continuous queries, where MapReduce jobs can run continuously, accepting new data as it arrives and analyzing it immediately. This allows MapReduce to be used for applications such as event monitoring and stream processing Outline 1. 2. 3. 4. 5. Hadoop Background HOP Architecture Online Aggregation Stream Processing Conclusions Hadoop Architecture вЂў Hadoop MapReduce вЂ“ Single master node, many worker nodes вЂ“ Client submits a job to master node вЂ“ Master splits each job into tasks (map/reduce), and assigns tasks to worker nodes вЂў Hadoop Distributed File System (HDFS) вЂ“ Single name node, many data nodes вЂ“ Files stored as large, fixed-size (e.g. 64MB) blocks вЂ“ HDFS typically holds map input and reduce output Job Scheduling in Hadoop вЂў One map task for each block of the input file вЂ“ Applies user-defined map function to each record in the block вЂ“ Record = <key, value> вЂў User-defined number of reduce tasks вЂ“ Each reduce task is assigned a set of record groups, i.e., intermediate records corresponding to a group of keys вЂ“ For each group, apply user-defined reduce function to the record values in that group вЂў Reduce tasks read from every map task вЂ“ Each read returns the record groups for that reduce task Map Task Execution 1. Map phase вЂ“ Read the assigned input split from HDFS вЂў Split = file block by default вЂ“ Parses input into records (key/value pairs) вЂ“ Applies map function to each record вЂў Returns zero or more new records 2. Commit phase вЂ“ Registers the final output with the worker node вЂў Stored in the local filesystem as a file вЂў Sorted first by bucket number then by key вЂ“ Informs master node of its completion Reduce Task Execution 1. Shuffle phase вЂ“ Fetches input data from all map tasks вЂў The portion corresponding to the reduce taskвЂ™s bucket 2. Sort phase вЂ“ Merge-sort *all* map outputs into a single run 3. Reduce phase вЂ“ Applies user-defined reduce function to the merged run вЂў Arguments: key and corresponding list of values вЂ“ Write output to a temp file in HDFS вЂў Atomic rename when finished Dataflow in Hadoop вЂў Map tasks write their output to local disk вЂ“ Output available after map task has completed вЂў Reduce tasks write their output to HDFS вЂ“ Once job is finished, next jobвЂ™s map tasks can be scheduled, and will read input from HDFS вЂў Therefore, fault tolerance is simple: simply rerun tasks on failure вЂ“ No consumers see partial operator output Dataflow in Hadoop Submit job map map schedule reduce reduce Dataflow in Hadoop Read Input File Block 1 HDFS Block 2 map reduce map reduce Dataflow in Hadoop map reduce Local FS HTTP GET map Local FS reduce Dataflow in Hadoop reduce Write Final Answer HDFS reduce Design Implications 1. Fault Tolerance вЂ“ Tasks that fail are simply restarted вЂ“ No further steps required since nothing left the task 2. вЂњStragglerвЂќ handling вЂ“ Job response time affected by slow task вЂ“ Slow tasks get executed redundantly вЂў Take result from the first to finish вЂў Assumes slowdown is due to physical components (e.g., network, host machine) вЂў Pipelining can support both! Hadoop Online Prototype (HOP) Hadoop Online Prototype вЂў HOP supports pipelining within and between MapReduce jobs: push rather than pull вЂ“ Preserves simple fault tolerance scheme вЂ“ Improved job completion time (better cluster utilization) вЂ“ Improved detection and handling of stragglers вЂў MapReduce programming model unchanged вЂ“ Clients supply same job parameters вЂў Hadoop client interface backward compatible вЂ“ Extended to take a series of jobs Pipelining Batch Size вЂў Initial design: pipeline eagerly (for each row) вЂ“ Moves more sorting work to reducer вЂ“ Prevents use of combiner вЂ“ Map function can block on network I/O вЂў Revised design: map writes into buffer вЂ“ Spill thread: sort & combine buffer, spill to disk вЂ“ Send thread: pipeline spill files => reducers Fault Tolerance вЂў Fault tolerance in MR is simple and elegant вЂ“ Simply recompute on failure, no state recovery вЂў Initial design for pipelining FT: вЂ“ Reduce treats in-progress map output as tentative, that is: can merge together spill files generated by the same uncommitted mapper, but not combine those spill files with the output of other map tasks вЂў Revised design: вЂ“ Pipelining maps periodically checkpoint output вЂ“ Reducers can consume output <= checkpoint вЂ“ Bonus: improved speculative execution Fault Tolerance in HOP вЂў Traditional fault tolerance algorithms for pipelined dataflow systems are complex вЂў HOP approach: write to disk and pipeline вЂ“ вЂ“ вЂ“ вЂ“ Producers write data into in-memory buffer In-memory buffer periodically spilled to disk Spills are also sent to consumers Consumers treat pipelined data as вЂњtentativeвЂќ until producer is known to complete вЂ“ Fault tolerance via task restart, tentative output discarded Refinement: Checkpoints вЂў Problem: Treating output as tentative inhibits parallelism вЂў Solution: Producers periodically вЂњcheckpointвЂќ with Hadoop master node вЂ“ вЂњOutput split x corresponds to input offset yвЂќ вЂ“ Pipelined data <= split x is now non-tentative вЂ“ Also improves speculation for straggler tasks, reduces redundant work on task failure Online Aggregation вЂў Traditional MR: poor UI for data analysis вЂў Pipelining means that data is available at consumers вЂњearlyвЂќ вЂ“ Can be used to compute and refine an approximate answer вЂ“ Often sufficient for interactive data analysis, developing new MapReduce jobs, ... вЂў Within a single job: periodically invoke reduce function at each reduce task on available data вЂў Between jobs: periodically send a вЂњsnapshotвЂќ to consumer jobs Online Aggregation in HOP Read Input File Block 1 HDFS map reduce HDFS Block 2 map reduce Write Snapshot Answer Inter-Job Online Aggregation вЂў Like intra-job OA, but approximate answers are pipelined to map tasks of next job вЂ“ Requires co-scheduling a sequence of jobs вЂў Consumer job computes an approximation вЂ“ Can be used to feed an arbitrary chain of consumer jobs with approximate answers Inter-Job Online Aggregation Write Answer reduce map HDFS reduce Job 1 Reducers map Job 2 Mappers Example Scenario 0 - 1)- %'2 $$"%$34#- ' - . /# 012341# 56/ #*# 56/ #( ! # 56/ #$! # (!!" # ! "#$"%&&' ' !" # &! " # %! " # $! " # !" # ! # ( ! # $! # ) ! # %! # *! # &! # +! # ' ! # , ! # ( ! ! #( ( ! #( $! #( ) ! #( %! # ( )* %'+&%, #- . &/' вЂў Top K most-frequent-words in 5.5GB Wikipedia corpus (implemented as 2 MR jobs) вЂў 60 node EC2 cluster Fault Tolerance вЂў For instance: j1-reducer & j2-map вЂ“ As new snapshots produced by j1, j2 re-computes from scratch using the new snapshot; вЂ“ Tasks that fail in j1 recover as discussed earlier; вЂ“ If a task in j2 fails, the system simply restarts the failed task. The next snapshot received by the restarted reduce task in j2 will always have a higher progress score than that received by the failed task; вЂ“ To handle failures in j1, tasks in j2 cache the most recent snapshot received from j1 and replace it when new one comes; вЂ“ If tasks from both jobs fail, a new task in j2 recovers the most recent snapshot from j1. Stream Processing вЂў MapReduce is often applied to streams of data that arrive continuously вЂ“ Click streams, network traffic, web crawl data, вЂ¦ вЂў Traditional approach: buffer, batch process 1.Poor latency 2.Analysis state must be reloaded for each batch вЂў Instead, run MR jobs continuously, and analyze data as it arrives Monitoring The thrashing host was detected very rapidlyвЂ”notably faster than the 5-second TaskTracker- JobTracker heartbeat cycle that is used to detect straggler tasks in stock Hadoop. We envision using these alerts to do early detection of stragglers within a MapReduce job. Performance: Blocking вЂў 10 GB input file вЂў 20 map tasks, 5 reduce tasks Performance: Pipelining вЂў 462 seconds vs. 561seconds Other HOP Benefits вЂў Shorter job completion time via improved cluster utilization: reduce work starts early вЂ“ Important for high-priority jobs, interactive jobs вЂў Adaptive load management вЂ“ Better detection and handling of вЂњstragglerвЂќ tasks Conclusions вЂў HOP extends the applicability of the model to pipelining behaviors, while preserving the simple programming model and fault tolerance of a fullfeatured MapReduce framework. вЂў Future topics - Scheduling - explore using MapReduce-style programming for even more interactive applications.