使用 Amazon Athena 與 Spring Boot 查詢 S3 數據
1. 概述
我們經常在 Amazon S3 中儲存大量數據,但分析這些數據可能具有挑戰性。傳統方法要求我們移動資料或建立資料倉儲等複雜系統。
Amazon Athena提供了一個更簡單的解決方案,讓我們可以直接使用 SQL 查詢 S3 資料。
在本教程中,我們將探索如何使用 Amazon Athena 透過 Spring Boot 分析 S3 儲存桶中的資料。我們將逐步完成必要的配置,以程式方式執行 Athena 查詢,並處理結果。
2.了解亞馬遜Athena
Amazon Athena 是一種無伺服器查詢服務,可讓我們對 S3 儲存桶中儲存的資料執行即席查詢,而無需設定任何基礎架構。
使用 Athena 的主要好處之一是我們只需為執行查詢時消耗的資料量付費,這使其成為臨時和偶爾資料分析的經濟高效的解決方案。
Athena 也使用讀取模式將傳輸中的 S3 資料轉換為類似表的結構。具體來說,這意味著我們在不更改來源且不執行任何提取、轉換和載入 (ETL) 操作的情況下查詢資料。我們在 Athena 中定義的表格並不像傳統資料庫那樣包含實際資料。相反,它們儲存有關如何轉換來源資料以進行查詢的說明。
我們的 S3 儲存桶中的資料可以源自於各種 AWS 服務,例如CloudTrail 日誌、 VPC 流日誌和ALB 存取日誌,甚至是我們以 JSON、XML、Parquet 等格式儲存在 S3 中的自訂資料。
3. 設定項目
在使用 Amazon Athena 之前,我們需要包含它的依賴項並正確配置我們的應用程式。
3.1.依賴關係
首先,我們將Amazon Athena 相依性新增至專案的pom.xml
檔案:
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>athena</artifactId>
<version>2.26.0</version>
</dependency>
</dependencies>
此依賴項為我們提供了AthenaClient
和其他相關類,我們將使用它們與 Athena 服務進行互動。
3.2.定義 Athena 配置屬性
現在,要與 Athena 服務互動並執行查詢,我們需要配置用於身份驗證的 AWS 憑證、用於執行 SQL 查詢的 Athena 資料庫名稱以及查詢結果位置(Athena 儲存結果的 S3 儲存桶)我們的查詢。
我們將這些屬性儲存在專案的application.yaml
檔案中,並使用@ConfigurationProperties
將值對應到 POJO,我們的服務層在與 Athena 互動時引用該 POJO:
@Getter
@Setter
@Validated
@ConfigurationProperties(prefix = "com.baeldung.aws")
class AwsConfigurationProperties {
@NotBlank
private String accessKey;
@NotBlank
private String secretKey;
@Valid
private Athena athena = new Athena();
@Getter
@Setter
public class Athena {
@Nullable
private String database = "default";
@NotBlank
private String s3OutputLocation;
}
}
s3OutputLocation
欄位表示 Athena 儲存查詢結果的 S3 儲存桶位置。這是必要的,因為 Athena 是無伺服器的,本身不儲存任何資料。相反,它執行查詢並將結果寫入指定的 S3 位置,然後我們的應用程式可以從中讀取結果。
我們還添加了驗證註釋,以確保正確配置所有必需的屬性。如果任何定義的驗證失敗,都會導致 Spring ApplicationContext
無法啟動。這使我們能夠遵循快速失敗模式。
以下是application.yaml
檔案的片段,它定義了將自動對應到我們的AwsConfigurationProperties
類別的所需屬性:
com:
baeldung:
aws:
access-key: ${AWS_ACCESS_KEY}
secret-key: ${AWS_SECRET_KEY}
athena:
database: ${AMAZON_ATHENA_DATABASE}
s3-output-location: ${AMAZON_ATHENA_S3_OUTPUT_LOCATION}
因此,此設定允許我們外部化 Athena 屬性並在我們的應用程式中輕鬆存取它們。
4.在Spring Boot中配置Athena
現在我們已經定義了屬性,讓我們引用它們來配置與 Athena 互動所需的 bean。
4.1.創建AthenaClient
Bean
AthenaClient
是與 Athena 服務互動的主要入口點。我們將創建一個 bean 來設定它:
@Bean
public AthenaClient athenaClient() {
String accessKey = awsConfigurationProperties.getAccessKey();
String secretKey = awsConfigurationProperties.getSecretKey();
AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey);
return AthenaClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
.build();
}
在這裡,我們使用配置的 AWS 憑證建立AthenaClient
的實例。此客戶端用於啟動查詢執行並從 S3 儲存桶檢索結果。
4.2.定義QueryExecutionContext
Bean
接下來,我們需要告訴 Athena 在執行 SQL 查詢時使用哪個資料庫:
@Bean
public QueryExecutionContext queryExecutionContext() {
String database = awsConfigurationProperties.getAthena().getDatabase();
return QueryExecutionContext.builder()
.database(database)
.build();
}
我們建立一個QueryExecutionContext
bean 並指定用於查詢的資料庫。資料庫名稱是從我們的配置屬性中檢索的,如果未明確指定,則預設為default
資料庫。
4.3.設定ResultConfiguration
Bean
最後,我們需要配置 Athena 儲存 SQL 查詢結果的位置:
@Bean
public ResultConfiguration resultConfiguration() {
String outputLocation = awsConfigurationProperties.getAthena().getS3OutputLocation();
return ResultConfiguration.builder()
.outputLocation(outputLocation)
.build();
}
需要注意的是,我們用來儲存查詢結果的 S3 儲存桶應該與包含來源資料的儲存桶不同。
這種分離可以防止查詢結果被解釋為額外的來源數據,這會導致意外的查詢結果。此外,Athena 應該對來源儲存桶具有唯讀存取權限以維護資料完整性,並且僅在我們配置用於儲存結果的儲存桶上授予寫入權限。
5. 執行 Athena 查詢
完成必要的配置後,讓我們看看如何使用 Athena 執行查詢。我們將建立一個QueryService
類,自動組裝我們建立的所有 bean,並公開一個封裝查詢執行邏輯的公共execute()
方法。
5.1.開始執行查詢
首先,我們將使用AthenaClient
實例開始查詢執行:
public <T> List<T> execute(String sqlQuery, Class<T> targetClass) {
String queryExecutionId;
try {
queryExecutionId = athenaClient.startQueryExecution(query ->
query.queryString(sqlQuery)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
).queryExecutionId();
} catch (InvalidRequestException exception) {
log.error("Invalid SQL syntax detected in query {}", sqlQuery, exception);
throw new QueryExecutionFailureException();
}
// ...rest of the implementation in the upcoming sections
}
開始執行查詢時,我們提供 SQL 查詢字串、 QueryExecutionContext
和ResultConfiguration
。 startQueryExecution()
方法傳回一個唯一的queryExecutionId
,我們將使用它來追蹤查詢的狀態並檢索結果。
targetClass
參數指定我們將查詢結果對應到的 Java 類別。
如果提供的 SQL 查詢包含語法錯誤,我們也會處理 Athena SDK 引發的InvalidRequestException
。我們捕獲此異常,記錄錯誤訊息以及無效查詢,並拋出自定義QueryExecutionFailureException
。
5.2.等待查詢完成
開始執行查詢後,我們需要等待它完成,然後再嘗試檢索結果:
private static final long WAIT_PERIOD = 30;
private void waitForQueryToComplete(String queryExecutionId) {
QueryExecutionState queryState;
do {
GetQueryExecutionResponse response = athenaClient.getQueryExecution(request ->
request.queryExecutionId(queryExecutionId));
queryState = response.queryExecution().status().state();
switch (queryState) {
case FAILED:
case CANCELLED:
String error = response.queryExecution().status().athenaError().errorMessage();
log.error("Query execution failed: {}", error);
throw new QueryExecutionFailureException();
case QUEUED:
case RUNNING:
TimeUnit.MILLISECONDS.sleep(WAIT_PERIOD);
break;
case SUCCEEDED:
queryState = QueryExecutionState.SUCCEEDED;
return;
}
} while (queryState != QueryExecutionState.SUCCEEDED);
}
我們建立一個私有waitForQueryToComplete()
方法,並使用getQueryExecution()
方法定期輪詢查詢的狀態,直到達到SUCCEEDED
狀態。
如果查詢失敗或被取消,我們會記錄錯誤訊息並拋出自定義的QueryExecutionFailureException
。如果它正在排隊或正在運行,我們會等待一小段時間,然後再次檢查。
我們使用從開始查詢執行時收到的queryExecutionId
從我們的execute()
方法呼叫waitForQueryToComplete()
方法。
5.3.處理查詢結果
查詢執行成功完成後,我們可以檢索結果:
GetQueryResultsResponse queryResult = athenaClient.getQueryResults(request ->
request.queryExecutionId(queryExecutionId));
getQueryResults()
方法傳回包含結果集的GetQueryResultsResponse
物件。我們可以處理這些結果並將它們轉換為由我們的execute()
方法的targetClass
參數指定的類別的實例:
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JsonOrgModule());
private <T> List<T> transformQueryResult(GetQueryResultsResponse queryResultsResponse, Class<T> targetClass) {
List<T> response = new ArrayList<T>();
List<Row> rows = queryResultsResponse.resultSet().rows();
List<String> headers = rows.get(0).data().stream().map(Datum::varCharValue).toList();
rows.stream()
.skip(1)
.forEach(row -> {
JSONObject element = new JSONObject();
List<Datum> data = row.data();
for (int i = 0; i < headers.size(); i++) {
String key = headers.get(i);
String value = data.get(i).varCharValue();
element.put(key, value);
}
T obj = OBJECT_MAPPER.convertValue(element, targetClass);
response.put(obj);
});
return response;
}
在這裡,我們從結果集的第一行中提取標題,然後處理每個後續行,將其轉換為JSONObject
,其中鍵是列名稱,值是相應的單元格值。然後,我們使用ObjectMapper
將每個JSONObject
轉換為指定目標類別的實例,表示域模型。這些域模型物件將會加入到傳回的清單中。
值得注意的是,我們的transformQueryResult()
實作是通用的,適用於所有類型的讀取查詢,無論表或域模型如何。
5.4 使用execute()
方法執行SQL查詢
隨著我們的execute()
方法的完全實現,我們現在可以輕鬆地對我們的S3資料運行SQL查詢並將結果作為域模型物件檢索:
String query = "SELECT * FROM users WHERE age < 25;";
User user = queryService.execute(query, User.class);
record User(Integer id, String name, Integer age, String city) {};
在這裡,我們定義一個 SQL 查詢來選擇所有 25 歲以下的使用者。我們將此查詢和User
類別傳遞給我們的execute()
方法。 User
類別是一個簡單的記錄,表示我們期望檢索的資料的結構。
execute()
方法負責啟動查詢執行、等待其完成、檢索結果並將它們轉換為User
物件清單。這種抽象化使我們能夠專注於查詢和領域模型,而不必擔心與 Athena 的底層互動。
5.5.使用 Athena 的參數化語句
需要注意的是,在使用使用者輸入來建構 SQL 查詢時,我們應該警惕 SQL 注入攻擊的風險。 Athena 支援參數化語句,這使我們能夠將 SQL 查詢與參數值分開,從而提供一種更安全的方式來透過使用者輸入執行查詢。雖然我們在這裡使用原始 SQL 查詢進行演示,但強烈建議在使用使用者提供的輸入建立查詢時使用參數化語句。
要使用參數化查詢,我們可以修改我們的execute()
方法以接受可選的參數清單:
public <T> List<T> execute(String sqlQuery, List<String> parameters, Class<T> targetClass) {
// ... same as above
queryExecutionId = athenaClient.startQueryExecution(query ->
query.queryString(sqlQuery)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
.executionParameters(parameters)
).queryExecutionId();
// ... same as above
}
我們為execute()
方法新增了一個新的parameters
參數,它是將在參數化查詢中使用的字串值清單。當開始查詢執行時,我們使用executionParameters()
方法傳遞這些parameters
。
讓我們看看如何使用更新後的execute()
方法:
public List<User> getUsersByName(String name) {
String query = "SELECT * FROM users WHERE name = ?";
return queryService.execute(query, List.of(name), User.class);
}
此範例定義帶有佔位符“?”的 SQL 查詢對於name
參數。我們將名稱值作為包含單一元素的清單以及查詢和目標類別傳遞給execute()
方法。
6. 自動化資料庫和表格創建
要使用 Athena 查詢 S3 數據,我們需要先定義一個資料庫和一個表,它們將對應到 S3 儲存桶中儲存的資料。雖然我們可以使用 AWS 管理主控台手動建立這些內容,但在應用程式啟動過程中自動執行此程序會更方便。
我們將把用於設定必要的資料庫和表格的 SQL 腳本放在新的athena-init
目錄中,我們將在src/main/resources
目錄中建立該目錄。
為了執行這些 SQL 腳本,我們將建立一個實作ApplicationRunner
介面的AthenaInitializer
類別:
@Component
@RequiredArgsConstructor
class AthenaInitializer implements ApplicationRunner {
private final QueryService queryService;
private final ResourcePatternResolver resourcePatternResolver;
@Override
public void run(ApplicationArguments args) {
Resource[] initScripts = resourcePatternResolver.getResources("classpath:athena-init/*.sql");
for (Resource script : initScripts) {
String sqlScript = FileUtils.readFileToString(script.getFile(), StandardCharsets.UTF_8);
queryService.execute(sqlScript, Void.class);
}
}
}
透過 Lombok 使用建構函式註入,我們注入先前建立的ResourcePatternResolver
和QueryService
的實例。
我們使用ResourcePatternResolver
來定位athena-init
目錄中的所有 SQL 腳本。然後,我們迭代這些腳本,使用 Apache Commons IO 讀取它們的內容,並使用我們的QueryService
執行它們。
我們首先建立一個create-database.sql
腳本來建立自訂資料庫:
CREATE DATABASE IF NOT EXISTS baeldung;
我們建立一個名為baeldung
的自訂資料庫(如果它尚不存在)。此處使用的資料庫名稱可以在application.yaml
檔案中配置,正如我們在本教程前面所看到的。
同樣,要在baeldung
資料庫中建立名為users
表,我們將建立另一個名為create-users-table.sql
腳本,其中包含以下內容:
CREATE EXTERNAL TABLE IF NOT EXISTS users (
id INT,
name STRING,
age INT,
city STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://baeldung-athena-tutorial-bucket/';
此腳本建立一個名為users
外部表,其中的欄位與我們將儲存在 S3 中的 JSON 資料中的欄位相對應。我們指定JsonSerDe
作為行格式,並提供儲存 JSON 檔案的 S3 位置。
值得注意的是,要使用 Athena 正確查詢 S3 中儲存的數據,請務必確保每個 JSON 記錄完全位於單行文字中,鍵和值之間沒有空格或換行符:
{"id":1,"name":"Homelander","age":41,"city":"New York"}
{"id":2,"name":"Black Noir","age":58,"city":"Los Angeles"}
{"id":3,"name":"Billy Butcher","age":46,"city":"London"}
7.IAM權限
最後,為了讓我們的應用程式正常運行,我們需要為應用程式中配置的 IAM 使用者配置一些權限。
我們的策略應該配置 Athena 和 S3 存取:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAthenaQueryExecution",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": "arn:aws:athena:region:account-id:workgroup/primary"
},
{
"Sid": "AllowS3ReadAccessToSourceBucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::baeldung-athena-tutorial-bucket",
"arn:aws:s3:::baeldung-athena-tutorial-bucket/*"
]
},
{
"Sid": "AllowS3AccessForAthenaQueryResults",
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::baeldung-athena-tutorial-results-bucket",
"arn:aws:s3:::baeldung-athena-tutorial-results-bucket/*"
]
},
{
"Sid": "AllowGlueCatalogAccessForAthena",
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:GetDatabase",
"glue:CreateTable",
"glue:GetTable"
],
"Resource": [
"arn:aws:glue:region:account-id:catalog",
"arn:aws:glue:region:account-id:database/baeldung",
"arn:aws:glue:region:account-id:table/baeldung/users"
]
}
]
}
IAM 策略由四個關鍵語句組成,用於建立 Spring Boot 應用程式所需的權限。 AllowAthenaQueryExecution
語句提供與 Athena 本身互動所需的權限,包括啟動查詢、檢查其狀態和檢索結果。
然後, AllowS3ReadAccessToSourceBucket
語句允許對包含我們要查詢的來源資料的 S3 儲存桶進行讀取存取。 AllowS3AccessForAthenaQueryResults
語句將聚焦在 Athena 儲存查詢結果的 S3 儲存桶。它授予 Athena 將結果寫入配置的 S3 儲存桶以及我們的應用程式檢索結果的權限
最後,為了允許與 Athena 用作其元資料儲存的 AWS Glue 進行交互,我們定義了AllowGlueCatalogAccessForAthena
語句。它允許我們建立和檢索資料庫和表定義,這對於 Athena 理解 S3 資料的結構和執行 SQL 查詢至關重要。
我們的 IAM 政策符合最小權限原則,僅授予我們的應用程式正常運作所需的必要權限。
八、結論
在本文中,我們探索了使用 Amazon Athena 與 Spring Boot 直接從 S3 儲存桶查詢數據,而無需設定任何複雜的基礎設施。
我們討論了啟動查詢執行、等待其完成以及一般處理查詢結果。此外,我們還使用應用程式啟動期間執行的 SQL 腳本自動建立資料庫和表。
與往常一樣,本文中使用的所有程式碼範例都可以在 GitHub 上找到。