QWERTY

Hello World!

MapReduce 簡介

Metadata

Title MapReduce: Simplified Data Processing on Large Clusters
Conference Proceedings Title OSDI ‘04
Authors Jeffrey Dean, Sanjay GhemawatCheng
Date 2004/10
Page 13
Times cited 34516

Introduction

  • Motivation: Lots of special-purpose programs should process large amounts of raw data
    • e.g., crawl(analyze) documents, web request logs, etc.
  • Inspiration
    • Map() and Reduce() primitives present in Lisp and many other functional languages
  • Solution: Design a programming model: MapReduce
    • Hides the details of parallelization, fault-tolerance, locality optimization, and load balancing

Map and Reduce

  • Divide, Conquer, and Combine → Divide, Map, and Reduce
  • User only need to implement Map() and Reduce() functions and tune settings
    • MapReduce model already implement other things to do

Programming Model

  • Map()
    • Produce a set of intermediate key/value pairs
  • After Map() complete
    • Sort intermediate key-value pairs and divide into pieces for Reduce() to use
  • Reduce()
    • Merge the value of the same intermediate key

Example

Input → After Map → After Reduce

  • Search (in a large file set)
    • Inverted Index
      • <doc, docID><word, docID><word, list(docID)>
    • Reverse WebLink
      • sourceURL<sourceURL, targetURL><targetURL, list(sourceURL)>
  • Count (in a large file set)
    • URL Frequency in webpage
      • URL<URL, 1><URL, TotalCount>
    • Term Frequency
      • term<term, freq><term, freq>
  • Sort (in a large file set)
    • unsorted listseperated sorted listmerged sorted list

Implementation

Use C++ to implement

  • Data storage
    • 資料存放在GFS(Global File System), 64MB為一個BLOCK
      • 存放多個複本於不同機器
    • 網路頻寬是稀缺資源
      • Worker和資料若在不同機器,會傳輸很久
        • Master儘量分配Worker使用本地有資料的任務
  • Task assign
    • Map() 分成 $M$ 個任務, 將 Reduce() 分成 $R$ 個任務
      • $M$ 和 $R$ 的數量比可用的機器多很多
    • Master 需要分配 $O(M+R)$ 個任務 和 記錄 $O(M \times R)$ 塊資料
    • 切得愈小,故障代價愈小,較容易平均分配,但Master需要記較多資料
      • 一塊通常是16~64MB,以配合GFS
      • Google的設定:$M$ = 200000, $R$ = 5000, using 2000 worker machines
  • Input
    • Input of Map: 把 Input data 切成 $M$ 塊
    • Input of Reduce: 把 Intermidiate data 切成 $R$ 塊
      • Intermediate key and set of values of the key
      • 用 hash(key) mod R 的值來代表 key 是第幾塊
  • Master program: 分配任務
    • Map worker
      • 執行完後將 $R$ 塊Intermediate資料寫入本地磁碟
      • 回傳檔案位置至Master
    • Reduce worker
      • 從Master得知檔案位置,使用Remote Procedure Call(RPC)來讀取資料

故障容忍 Fault Tolerance

  • Worker故障
    • 可能是網路、硬碟、機器故障
    1. Master定期ping worker
    2. 若沒有回應,視為死掉
    3. 原先分配給死掉的Task會重新分配
      • 已完成的Reduce任務不需要重新分配
      • 資料已存放在全域資料系統(Map的結果資料放在本地端)
    4. 通知Reduce Worker新的資料路徑
  • Master故障
    • 幾乎不會發生(因為只有一台)
    • 定時記錄進度,以便還原

Backup Tasks

  • Straggler(落後者): 完成Task的時間很長
    • 可能原因
      • 磁碟故障
      • 記憶體不足
      • 網路不穩
      • 程式的bug
      • 在做其他任務
  • 解法: 若MapReduce快完成時, Master 對還在執行的Task,執行多份備份Task
    • 只要其中一個完成即可
    • 只多消耗一點運算資源,可減少30%耗時

Performance

  • Environment
    • 1800 machines
      • two 2GHz Intel Xeon processors with Hyper-Threading enabled
      • 4GB of memory
      • two 160GB IDE disks
      • 1 gigabit Ethernet link
  • 測試兩種Task
    1. Grep 1TB data
    2. Sort 1TB data
  • Performance
    • Normal Execution
      • 891sec
    • 刻意殺掉200個worker(總共1746個)
      • 933sec
      • only 5% increase
    • 沒有Backup Task的情況
      • 1283sec
      • 44% increase

Refinement

  • Locality Optimization
    • 將輸入檔案複製到本地磁碟
  • Ordering Guarantee
    • 保證 Intermediate key/value 已排序
    • Reduce() 處理時速度較快
  • Combiner Function
    • 考慮計算次數的任務,會送出很多<key,1>的records,可在 Map() 之後先做簡易的 Reduce(),稱為Combine
      • 可減少網路傳輸量
  • Partitioning Function
    • 預設使用Hash Function 來分割成 $R$ 個檔案
      • Hash(key) mod R
    • 特殊情況可以用別的,如key為網址的時候希望相同Host的資料放在同一個Reduce() Task上
      • Hash(Hostname(url)) mod R
  • 輸入和輸出
    • 使用介面來實作新的輸入/輸出類別
    • 輸入類別
      • 文字處理類別 (Text Mode)
        • 每一行都是 key/value pair
          • Key: 此行的檔案 offset
          • Value: 此行的內容
      • key/value pairs 的類別
        • 用來保持key的排序
  • Side Effects
    • e.g., 執行Map()Reduce()時,產生附加檔案(Auxiliary Files)
    • Make sure side-effect is atomic and idempotent(冪等)
      • Idempotent:一個運算作用在任一元素兩次後會和其作用一次的結果相同
  • Skipping Bad Records
    • 忽略少數的故障(當資料夠大時)
      • 對答案影響不大
      • 故障時,傳送資料給Master
        • 連續錯N次後,直接跳過此輸入
  • 實作Counter類別
    • Map()Reduce() 計算數量用
    • 定期傳送資料給Master以追蹤進度
  • Local Execution
    • 將全部資料於同一台電腦中執行(即單機版本),以利Debug
  • Status Information
    • Master有http server提供狀態頁面,以提供進度監控

結論

  • 此框架隱藏了錯誤處理、平行化、分散式系統、資源分配的細節
    • 沒有分散式系統和平行化的經驗也可以輕鬆上手
      • 只需思考分割和組合資料的邏輯
    • 程式邏輯更簡單,更容易修改
      • 3800行 → 700行 (C++)
  • 適合用來處理可以分割、資料量大的問題

應用

因為是非常一般化的框架,可用在許多用途

  • 資料處理
    • 統計,如PageRank
    • 搜尋,如Google Search Indexing
    • 排序
  • 機器學習

備註

  • Map()Reduce() 的實作必須是Deterministic
    • 否則因為機器故障、輸入順序不同等因素,輸出可能會不同
  • Hadoop 為 MapReduce 的 Open Source 實作

參考資料

  • Jeffrey Dean and Sanjay Ghemawat, 2004, MapReduce: Simplified Data Processing on Large Clusters
  • 簡報

歡迎關注我的其它發布管道