Hadoop Streaming

Hadoop數據流是Hadoop自帶發行的實用程序。該實用程序允許創建和運行Map/Reduce任務的任何可執行文件或腳本映射器和/或減速器。

使用Python示例

對於Hadoop的數據流,我們考慮的字計數問題。任何工作在Hadoop中必須有兩個階段:映射器和減速器。我們使用python腳本代碼映射器和減速器在Hadoop下運行它。使用Perl和Ruby也是類似的。

映射階段代碼

!/usr/bin/python import sys # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Break the line into words words = myline.split() # Iterate the words list for myword in words: # Write the results to standard output print '%s\t%s' % (myword, 1)

請確保此文件具有執行權限(使用chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。

減速器階段代碼

#!/usr/bin/python from operator import itemgetter import sys
current_word = "" current_count = 0 word = "" # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Split the input we got from mapper.py word, count = myline.split('\t', 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print '%s\t%s' % (current_word, current_count) current_count = count
current_word = word # Do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)

保存mapper.py和reducer.py 在 Hadoop 的主目錄映射器和減速器代碼。確保這些文件具有執行權限(使用chmod +x mapper.py 和 chmod +x reducer.py)。由於python具有大小寫敏感,因此相同的代碼可以從以下鏈接下載。

wordCount程序的執行

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar \ -input input_dirs \ -output output_dir \ -mapper <path/mapper.py \ -reducer <path/reducer.py

其中「\」用於續行以便於閱讀。

例如,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

數據流工作原理

在上面的例子中,這兩個映射器和減速是從標準輸入讀取作爲輸入,並輸出到標準輸出到Python腳本。實用程序將創建一個Map/Reduce作業,並將作業提交到一個合適的集羣,並監督工作的進展情況,直至完成。

當指定映射器的腳本,每個映射任務將啓動腳本作爲一個單獨的進程時映射器初始化。作爲mapper任務運行時,輸入轉換成行給進程的標準輸入(STDIN)。在此期間,映射器收集從該方法的標準輸出(stdout)面向行輸出和每一行轉換爲鍵/值對,其被收集作爲映射器的輸出。缺省情況下,一行到第一個製表符的前綴是鍵和行(不包括製表符)的其餘部分爲值。如果在該行沒有任何製表符,則整行鍵和值被視爲null。然而,這可以被定製,每次需要1個。

當指定減速腳本,每個減速器任務將啓動腳本作爲一個單獨的進程,然後減速初始化。減速器任務運行時將其轉換其輸入鍵/值對,進入行並將該行進程的標準輸入(STDIN)。在此期間,在減速機收集來自該過程的標準輸出(stdout)的面向行的輸出,每行轉換成一個密鑰/值對,其被收集作爲減速機的輸出。缺省情況下,一行到第一個製表符的前綴是鍵,(不包括製表符)的其餘部分的值爲行。然而,這可以被定製爲每次具體要求。

重要的命令

參數

描述

-input directory/file-name

輸入位置映射。 (必填)

-output directory-name

輸出位置的減速器。 (必填)

-mapper executable or script or JavaClassName

映射器可執行文件。 (必填)

-reducer executable or script or JavaClassName

減速器的可執行文件。 (必填)

-file file-name

使現有的映射器,減速機,或組合的可執行本地計算節點上。

-inputformat JavaClassName

類,應該提供返回鍵/值對文字類。如果沒有指定,使用TextInputFormat作爲默認。

-outputformat JavaClassName

類,提供應採取鍵/值對文字類的。如果沒有指定,使用TextOutputformat作爲默認值。

-partitioner JavaClassName

類,確定哪個減少一個鍵被髮送。

-combiner streamingCommand or JavaClassName

組合可執行文件映射輸出。

-cmdenv name=value

通過環境變量數據流的命令。

-inputreader

對於向後兼容性:指定記錄讀取器類(而不是輸入格式類)。

-verbose

詳細的輸出。

-lazyOutput

創建懶輸出。例如,如果輸出格式是基於FileOutputFormat,輸出文件僅在第一次調用output.collect(或Context.write)創建。

-numReduceTasks

指定減速器的數目。

-mapdebug

當map任務失敗的腳本調用。

-reducedebug

腳本調用時降低任務失敗。