Spark核心編程

Spark 核心是整個項目的基礎。它提供了分佈式任務調度,調度和基本的 I/O 功能。Spark 使用一種稱爲RDD(彈性分佈式數據集)一個專門的基礎數據結構,是整個機器分區數據的邏輯集合。RDDS可以用兩種方法來創建的;一個是在外部存儲系統引用的數據集,第二個是通過應用轉換(如map, filter, reducer, join)在現有RDDS。

RDD抽象通過語言集成API公開。這簡化了編程的複雜性,因爲應用程序的處理RDDS方式類似於操縱的本地集合數據。

Spark Shell

Spark提供了一個交互的shell − 一個強大的工具,以交互方式分析數據。 這是在 Scala或Python語言。Spark主要抽象稱爲彈性分佈式數據集(RDD)項目的分佈式採集。RDDS可以從Hadoop的輸入格式來創建(如HDFS文件)或通過轉化其他RDDS。

打開 Spark Shell

下面的命令用來打開Spark shell。

$ spark-shell

創建簡單RDD

讓我們從文本文件中創建一個簡單的 RDD。使用下面的命令來創建一個簡單的 RDD。

scala> val inputfile = sc.textFile(「input.txt」)

對上述命令的輸出爲:

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at :12

Spark RDD API引入了一些變革和一些動作來操縱RDD。

RDD 轉換

RDD轉換返回指向新的RDD,並允許創建RDDS之間的依賴關係。 在依賴關係鏈中的每個RDD(依賴關係的字串)具有這樣的功能,用於計算其數據並具有一個指針(依賴性)到其父RDD。

Spark是懶惰的,所以什麼都不會被執行,除非調用一些改造或行動將觸發作業創建和執行。看單詞計數示例,如下面的代碼片段。

因此,RDD轉型不是一組數據而是在程序中的一個步驟(可能是唯一的步驟)告訴Spark如何獲取數據以及如何使用它。

下面給出是RDD轉換的列表。

S.No

轉換&含義

1

map(func)

返回一個新的分佈式數據集,傳遞源的每個元素形成通過一個函數 func

2

filter(func)

返回由選擇在func返回true,源元素組成了一個新的數據集

3

flatMap(func)

類似映射,但每個輸入項目可以被映射到0以上輸出項目(所以func應返回seq而不是單一的項目)

4

mapPartitions(func)

類似映射,只不過是單獨的每個分區(塊)上運行RDD,因此 func 的類型必須是Iterator ⇒ Iterator 對類型T在RDD上運行時

5

mapPartitionsWithIndex(func)

類似映射分區,而且還提供func 來表示分區的索引的整數值,因此 func 必須是類型 (Int, Iterator) ⇒ Iterator 當類型T在RDD上運行時

6

sample(withReplacement, fraction, seed)

採樣數據的一小部分,有或沒有更換,利用給定的隨機數發生器的種子

7

union(otherDataset)

返回一個新的數據集,其中包含源數據和參數元素的結合

8

intersection(otherDataset)

返回包含在源數據和參數元素的新RDD 交集

9

distinct([numTasks])

返回一個新的數據集包含源數據集的不同元素

10

groupByKey([numTasks])

當調用(K,V)數據集,返回(K, Iterable) 對數據集

11

reduceByKey(func, [numTasks])

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

13

sortByKey([ascending], [numTasks])

14

join(otherDataset, [numTasks])

15

cogroup(otherDataset, [numTasks])

16

cartesian(otherDataset)

當上調用類型T和U的數據集,返回(T,U)對數據集(所有元素對)

17

pipe(command, [envVars])

RDD通過shell命令每個分區,例如:一個Perl或bash腳本。RDD元素被寫入到進程的標準輸入和線路輸出,標準輸出形式返回一個字符串RDD

18

coalesce(numPartitions)

減少RDD到numPartitions分區的數量。過濾大型數據集後,更高效地運行的操作

19

repartition(numPartitions)

打亂RDD數據隨機創造更多或更少的分區,並在它們之間平衡。這總是打亂的所有數據在網絡上

20

repartitionAndSortWithinPartitions(partitioner)

根據給定的分區重新分區RDD及在每個結果分區,排序鍵記錄。這是調用重新分配排序在每個分區內,因爲它可以推動分揀向下進入混洗機制效率更高。

動作

下表給出了操作,及其返回值的列表。

S.No

操作 & 含義

1

reduce(func)

合計數據集的元素,使用函數 func (其中有兩個參數和返回一行). 該函數應該是可交換和可結合,以便它可以正確地在並行計算。

2

collect()

返回數據集的所有作爲數組在驅動程序的元素。這是一個過濾器或其它操作之後返回數據的一個足夠小的子集,通常是有用的

3

count()

返回該數據集的元素數

4

first()

返回的數據集的第一個元素(類似於使用(1))

5

take(n)

返回與該數據集的前n個元素的陣列。

6

takeSample (withReplacement,num, [seed])

返回數組的數據集num個元素,有或沒有更換隨機抽樣,預指定的隨機數發生器的種子可選

7

takeOrdered(n, [ordering])

返回RDD使用或者按其自然順序或自定義比較的前第n個元素

8

saveAsTextFile(path)

寫入數據集是一個文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統,HDFS或任何其他的Hadoop支持的文件系統。Spark調用每個元素的 toString,將其轉換爲文件中的文本行

9

saveAsSequenceFile(path) (Java and Scala)

寫入數據集,爲Hadoop SequenceFile元素在給定的路徑寫入在本地文件系統,HDFS或任何其他Hadoop支持的文件系統。 這是適用於實現Hadoop可寫接口RDDS的鍵 - 值對。在Scala中,它也可以在屬於隱式轉換爲可寫(Spark包括轉換爲基本類型,如 Int, Double, String 等等)類型。

10

saveAsObjectFile(path) (Java and Scala)

寫入數據集的內容使用Java序列化爲一個簡單的格式,然後可以使用SparkContext.objectFile()加載。

11

countByKey()

僅適用於RDDS的類型 (K, V). 返回(K, Int)對與每個鍵的次數的一個HashMap。

12

foreach(func)

數據集的每個元素上運行函數func。這通常對於不良反應,例如更新累加器或與外部存儲系統進行交互進行。

 − 在 foreach()以外修改變量,其他累加器可能會導致不確定的行爲。請參閱瞭解閉包的更多細節。

RDD編程


讓我們來看看幾個RDD轉換和操作RDD編程實現,用一個例子的協助說明。

示例

考慮一個單詞計數的例子 − 它計算出現在文檔中的每個單詞。請看下面的文字爲輸入並保存在主目錄中的 input.txt 文件。

input.txt − 作爲輸入文件

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.

按照下面給出命令執行示例程序。

打開Spark-Shell

下面的命令用來打開spark shell. 通常情況下,spark 使用Scala構建。因此,Spark 程序需要在 Scala 環境中運行。

$ spark-shell 

如果Spark shell 成功打開,會發現下面的輸出。看看輸出「Spark 上下文可作爲sc」 的最後一行表示Spark容器會自動創建Spark 上下文對象名爲sc。啓動程序的第一步驟之前,SparkContext 對象應該被創建。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>

創建一個RDD

首先,我們必須使用 Spark-Scala API 讀取輸入文件,並創建一個RDD。

下面的命令被用於從給定位置讀出的文件。這裏,新的 RDD 使用輸入文件名創建。這是在 textFile(「」)方法的參數字符串是用於輸入文件名的絕對路徑。然而,如果僅給出文件名,那麼它輸入文件則在當前位置。

scala> val inputfile = sc.textFile("input.txt")

執行字數轉換

我們的目標是計算一個文件中的字數。分裂每一行成詞創建一個平面地圖(flatMap(line ⇒ line.split(「 」)).

接下來,讀每個詞作爲一個鍵和值 ‘1’ (<key, value> = <word,1>) 使用映射函數 (map(word ⇒ (word, 1)).

最後,加入類似的鍵值降低這些鍵 (reduceByKey(_+_)).

下面的命令用於執行字數統計邏輯。執行此操作後,不會有任何輸出,因爲這不是一個動作,這是一個轉換; 指向一個新的RDD或告訴spark,用給定的數據來做什麼)。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

當前RDD

同時用RDD工作,如果想了解當前的RDD,那麼可使用下面的命令。 它會告訴你關於當前RDD及其依賴調試的描述。

scala> counts.toDebugString

緩存轉換

可以使用 persist() 或 cache() 方法標記一個RDD。在第一次計算的操作,這將被保存在存儲器中的節點上。使用下面的命令來存儲中間轉換在內存中。

scala> counts.cache()

應用動作

應用動作(操作),比如存儲所有的轉換結果到一個文本文件中。saveAsTextFile(「」)方法字符串參數是輸出文件夾的絕對路徑。試試下面的命令來保存輸出文本文件。在下面的例子中, ‘output’ 的文件夾爲當前位置。

scala> counts.saveAsTextFile("output")

檢查輸出

打開另一個終端進入主目錄(其中spark 在其他終端中執行)。下面的命令用於檢查輸出目錄。

[[email protected] ~]$ cd output/
[[email protected] output]$ ls -1

part-00000
part-00001
_SUCCESS

下面的命令是用來查看輸出的 Part-00001 文件。

[[email protected] output]$ cat part-00000

輸出

(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

下面的命令是用來查看輸出的 Part-00001 文件。

[[email protected] output]$ cat part-00001

輸出

(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

UN持久存儲


UN持續存在之前,如果想看到用於該應用程序的存儲空間,可使用下面的URL在瀏覽器中查看。

http://localhost:4040

這將會看到下面的屏幕,該屏幕顯示用於應用程序,這些都在 Spark shell 運行的存儲空間。

如果想特別的RDD存儲空間,然後使用下面的命令。

Scala> counts.unpersist() 

將看到如下輸出 −

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at :14

爲了驗證在瀏覽器中的存儲空間,使用下面的URL。

http://localhost:4040/

會看到下面的畫面。它用於應用程序,這是在Spark shell運行存儲空間。