Apache Spark:數據框,數據集和RDD之間的差異

1.概述

Apache Spark是一個快速的分佈式數據處理系統。它執行內存中的數據處理,並使用內存中的緩存和優化的執行,從而實現了快速的性能。它為流行的編程語言(例如Scala,Python,Java和R)提供了高級API。

在本快速教程中,我們將介紹Spark的三個基本概念:數據幀,數據集和RDD。

2.數據框

從Spark 1.3開始,Spark SQL引入了表格形式的數據抽象,稱為DataFrame。從那時起,它已成為Spark中最重要的功能之一。當我們要處理結構化和半結構化的分佈式數據時,此API很有用。

在第3節中,我們將討論彈性分佈式數據集(RDD)。 DataFrame以比RDD更有效的方式存儲數據,這是因為DataFrame使用RDD的不變,內存,彈性,分佈式和並行功能,但它們也將架構應用於數據。 DataFrames還可以將SQL代碼轉換為優化的低級RDD操作。

我們可以通過三種方式創建DataFrames:

  • 轉換現有的RDD
  • 運行SQL查詢
  • 加載外部數據

Spark團隊SparkSession ,它統一了所有不同的上下文,從而確保開發人員無需擔心創建不同的上下文:

SparkSession session = SparkSession.builder()

 .appName("TouristDataFrameExample")

 .master("local[*]")

 .getOrCreate();



 DataFrameReader dataFrameReader = session.read();

我們將分析Tourist.csv文件:

Dataset<Row> data = dataFrameReader.option("header", "true")

 .csv("data/Tourist.csv");

由於Spark 2.0 DataFrame成為Row類型Dataset ,因此我們可以將DataFrame用作**Dataset<Row>** .

我們可以選擇感興趣的特定列。我們還可以對給定的列進行過濾和分組:

data.select(col("country"), col("year"), col("value"))

 .show();



 data.filter(col("country").equalTo("Mexico"))

 .show();



 data.groupBy(col("country"))

 .count()

 .show();

3. Datasets

數據集是一組強類型的結構化數據。它們提供了熟悉的面向對象編程風格以及類型安全性的好處,因為數據集可以在編譯時檢查語法並捕獲錯誤。

Dataset是DataFrame的擴展,因此我們可以將DataFrame視為數據集的無類型視圖。

Spark團隊Dataset API,正如他們提到的那樣:“ Spark Datasets的目標是提供一個API,使用戶可以輕鬆地表達對象域上的轉換,同時還提供Spark SQL執行的性能和魯棒性優勢。引擎”。

首先,我們需要創建一個類型TouristData的類:

public class TouristData {

 private String region;

 private String country;

 private String year;

 private String series;

 private Double value;

 private String footnotes;

 private String source;

 // ... getters and setters

 }

要將每個記錄映射到指定的類型,我們將需要使用編碼器。編碼器在Java對象和Spark的內部二進制格式之間進行轉換

// SparkSession initialization and data load

 Dataset<Row> responseWithSelectedColumns = data.select(col("region"),

 col("country"), col("year"), col("series"), col("value").cast("double"),

 col("footnotes"), col("source"));



 Dataset<TouristData> typedDataset = responseWithSelectedColumns

 .as(Encoders.bean(TouristData.class));

與DataFrame一樣,我們可以按特定的列進行過濾和分組:

typedDataset.filter((FilterFunction) record -> record.getCountry()

 .equals("Norway"))

 .show();



 typedDataset.groupBy(typedDataset.col("country"))

 .count()

 .show();

我們還可以進行操作,例如按列匹配特定範圍進行過濾或計算特定列的總和,以獲取其總值:

typedDataset.filter((FilterFunction) record -> record.getYear() != null

 && (Long.valueOf(record.getYear()) > 2010

 && Long.valueOf(record.getYear()) < 2017)).show();



 typedDataset.filter((FilterFunction) record -> record.getValue() != null

 && record.getSeries()

 .contains("expenditure"))

 .groupBy("country")

 .agg(sum("value"))

 .show();

4. RDD

彈性分佈式數據集或RDD是Spark的主要編程抽象。它表示元素的集合,這些元素是不可變的,有彈性的和分佈式的

一個RDD封裝了一個大型數據集,Spark將自動在整個集群中分佈RDD中包含的數據,並使我們對它們執行的操作並行化

我們只能通過穩定存儲中的數據操作或其他RDD上的操作來創建RDD。

當我們處理大量數據並且數據分佈在群集計算機上時,容錯能力至關重要。由於Spark內置的故障恢復機制,因此RDD具有彈性。 Spark依賴於以下事實:RDD會記住它們的創建方式,以便我們可以輕鬆地追溯沿襲來恢復分區

我們可以對RDD執行兩種類型的操作: Transformations和Actions

4.1 轉變

我們可以將轉換應用於RDD以操縱其數據。執行完此操作後,我們將獲得全新的RDD,因為RDD是不可變的對象

我們將檢查如何實現Map和Filter,這是兩種最常見的轉換。

首先,我們需要創建一個JavaSparkContext Tourist.csv文件中將數據作為RDD加載:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")

 .setMaster("local[*]");

 JavaSparkContext sc = new JavaSparkContext(conf);



 JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

接下來,讓我們應用map函數從每個記錄中獲取國家的名稱,並將名稱轉換為大寫。我們可以將此新生成的數據集保存為磁盤上的文本文件:

JavaRDD<String> upperCaseCountries = tourists.map(line -> {

 String[] columns = line.split(COMMA_DELIMITER);

 return columns[1].toUpperCase();

 }).distinct();



 upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

如果只想選擇一個特定國家/地區,則可以對原始遊客RDD應用過濾功能:

JavaRDD<String> touristsInMexico = tourists

 .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));



 touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2 動作

在對數據進行一些計算之後,動作將返回最終值或將結果保存到磁盤。

Spark中經常使用的兩個動作是Count和Reduce。

讓我們在CSV文件中計算國家總數:

// Spark Context initialization and data load

 JavaRDD<String> countries = tourists.map(line -> {

 String[] columns = line.split(COMMA_DELIMITER);

 return columns[1];

 }).distinct();



 Long numberOfCountries = countries.count();

現在,我們將按國家/地區計算總支出。我們需要過濾描述中包含支出的記錄。

如果不使用的JavaRDD ,我們將使用JavaPairRDD一對RDD是可以存儲鍵值對的一種RDD 。接下來讓我們檢查一下:

JavaRDD<String> touristsExpenditure = tourists

 .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));



 JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure

 .mapToPair(line -> {

 String[] columns = line.split(COMMA_DELIMITER);

 return new Tuple2<>(columns[1], Double.valueOf(columns[6]));

 });



 List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd

 .reduceByKey((x, y) -> x + y)

 .collect();

5.結論

綜上所述,當我們需要特定於域的API,需要聚合,求和或SQL查詢等高級表達式時,應使用DataFrames或Datasets。或者,當我們想要在編譯時進行類型安全時。

另一方面,當數據是非結構化的並且不需要實現特定的架構時,或者在需要低級的轉換和操作時,我們應該使用RDD。