Apache Spark:數據框,數據集和RDD之間的差異
- Spark
- java
1.概述
Apache Spark是一個快速的分佈式數據處理系統。它執行內存中的數據處理,並使用內存中的緩存和優化的執行,從而實現了快速的性能。它為流行的編程語言(例如Scala,Python,Java和R)提供了高級API。
在本快速教程中,我們將介紹Spark的三個基本概念:數據幀,數據集和RDD。
2.數據框
從Spark 1.3開始,Spark SQL引入了表格形式的數據抽象,稱為DataFrame。從那時起,它已成為Spark中最重要的功能之一。當我們要處理結構化和半結構化的分佈式數據時,此API很有用。
在第3節中,我們將討論彈性分佈式數據集(RDD)。 DataFrame以比RDD更有效的方式存儲數據,這是因為DataFrame使用RDD的不變,內存,彈性,分佈式和並行功能,但它們也將架構應用於數據。 DataFrames還可以將SQL代碼轉換為優化的低級RDD操作。
我們可以通過三種方式創建DataFrames:
- 轉換現有的RDD
- 運行SQL查詢
- 加載外部數據
Spark團隊SparkSession
,它統一了所有不同的上下文,從而確保開發人員無需擔心創建不同的上下文:
SparkSession session = SparkSession.builder()
.appName("TouristDataFrameExample")
.master("local[*]")
.getOrCreate();
DataFrameReader dataFrameReader = session.read();
我們將分析Tourist.csv
文件:
Dataset<Row> data = dataFrameReader.option("header", "true")
.csv("data/Tourist.csv");
由於Spark 2.0 DataFrame成為Row
類型Dataset
,因此我們可以將DataFrame用作**Dataset<Row>** .
我們可以選擇感興趣的特定列。我們還可以對給定的列進行過濾和分組:
data.select(col("country"), col("year"), col("value"))
.show();
data.filter(col("country").equalTo("Mexico"))
.show();
data.groupBy(col("country"))
.count()
.show();
3. Datasets
數據集是一組強類型的結構化數據。它們提供了熟悉的面向對象編程風格以及類型安全性的好處,因為數據集可以在編譯時檢查語法並捕獲錯誤。
Dataset
是DataFrame的擴展,因此我們可以將DataFrame視為數據集的無類型視圖。
Spark團隊Dataset
API,正如他們提到的那樣:“ Spark Datasets的目標是提供一個API,使用戶可以輕鬆地表達對象域上的轉換,同時還提供Spark SQL執行的性能和魯棒性優勢。引擎”。
首先,我們需要創建一個類型TouristData
的類:
public class TouristData {
private String region;
private String country;
private String year;
private String series;
private Double value;
private String footnotes;
private String source;
// ... getters and setters
}
要將每個記錄映射到指定的類型,我們將需要使用編碼器。編碼器在Java對象和Spark的內部二進制格式之間進行轉換:
// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"),
col("country"), col("year"), col("series"), col("value").cast("double"),
col("footnotes"), col("source"));
Dataset<TouristData> typedDataset = responseWithSelectedColumns
.as(Encoders.bean(TouristData.class));
與DataFrame一樣,我們可以按特定的列進行過濾和分組:
typedDataset.filter((FilterFunction) record -> record.getCountry()
.equals("Norway"))
.show();
typedDataset.groupBy(typedDataset.col("country"))
.count()
.show();
我們還可以進行操作,例如按列匹配特定範圍進行過濾或計算特定列的總和,以獲取其總值:
typedDataset.filter((FilterFunction) record -> record.getYear() != null
&& (Long.valueOf(record.getYear()) > 2010
&& Long.valueOf(record.getYear()) < 2017)).show();
typedDataset.filter((FilterFunction) record -> record.getValue() != null
&& record.getSeries()
.contains("expenditure"))
.groupBy("country")
.agg(sum("value"))
.show();
4. RDD
彈性分佈式數據集或RDD是Spark的主要編程抽象。它表示元素的集合,這些元素是不可變的,有彈性的和分佈式的。
一個RDD封裝了一個大型數據集,Spark將自動在整個集群中分佈RDD中包含的數據,並使我們對它們執行的操作並行化。
我們只能通過穩定存儲中的數據操作或其他RDD上的操作來創建RDD。
當我們處理大量數據並且數據分佈在群集計算機上時,容錯能力至關重要。由於Spark內置的故障恢復機制,因此RDD具有彈性。 Spark依賴於以下事實:RDD會記住它們的創建方式,以便我們可以輕鬆地追溯沿襲來恢復分區。
我們可以對RDD執行兩種類型的操作: Transformations和Actions 。
4.1 轉變
我們可以將轉換應用於RDD以操縱其數據。執行完此操作後,我們將獲得全新的RDD,因為RDD是不可變的對象。
我們將檢查如何實現Map和Filter,這是兩種最常見的轉換。
首先,我們需要創建一個JavaSparkContext
Tourist.csv
文件中將數據作為RDD加載:
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");
接下來,讓我們應用map函數從每個記錄中獲取國家的名稱,並將名稱轉換為大寫。我們可以將此新生成的數據集保存為磁盤上的文本文件:
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1].toUpperCase();
}).distinct();
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");
如果只想選擇一個特定國家/地區,則可以對原始遊客RDD應用過濾功能:
JavaRDD<String> touristsInMexico = tourists
.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");
4.2 動作
在對數據進行一些計算之後,動作將返回最終值或將結果保存到磁盤。
Spark中經常使用的兩個動作是Count和Reduce。
讓我們在CSV文件中計算國家總數:
// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1];
}).distinct();
Long numberOfCountries = countries.count();
現在,我們將按國家/地區計算總支出。我們需要過濾描述中包含支出的記錄。
如果不使用的JavaRDD
,我們將使用JavaPairRDD
。一對RDD是可以存儲鍵值對的一種RDD 。接下來讓我們檢查一下:
JavaRDD<String> touristsExpenditure = tourists
.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
.mapToPair(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
.reduceByKey((x, y) -> x + y)
.collect();
5.結論
綜上所述,當我們需要特定於域的API,需要聚合,求和或SQL查詢等高級表達式時,應使用DataFrames或Datasets。或者,當我們想要在編譯時進行類型安全時。
另一方面,當數據是非結構化的並且不需要實現特定的架構時,或者在需要低級的轉換和操作時,我們應該使用RDD。