使用非同步程式設計 - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用非同步程式設計

這些 AWS SDK for Java 2.x 功能具有非封鎖 I/O 支援的非同步用戶端,可在幾個執行緒中實作高並行。不過,不保證非封鎖 I/O 總數。非同步用戶端可能會在某些情況下執行封鎖呼叫,例如憑證擷取、使用 AWS Signature 第 4 版 (SigV4) 簽署請求,或端點探索。

同步方法會封鎖您的執行緒執行,直到用戶端收到服務的回應。非同步方法會立即傳回,將控制權回歸給呼叫端執行緒,無需等待回應。

由於非同步方法會在有可用回應之前傳回,您需要一個方法在回應準備好時取得回應。 AWS SDK for Java 傳回 CompletableFuture 物件的 2.x 中非同步用戶端的方法,可讓您在回應準備就緒時存取回應。

使用非同步用戶端 APIs

非同步用戶端方法的簽章與其同步方法相同,但非同步方法會傳回 CompletableFuture 物件,其中包含未來非同步操作的結果。如果在 SDK 的非同步方法執行時擲出錯誤,錯誤會擲出為 CompletionException

您可以使用其中一種方法來取得結果,將whenComplete()方法鏈結到 SDK 方法呼叫CompletableFuture傳回的 。whenComplete() 方法CompletionException會根據非同步呼叫的完成方式,接收結果或 類型的可擲出物件。您提供 動作給 whenComplete()來處理或檢查結果,然後再傳回呼叫程式碼。

如果您想要傳回 SDK 方法所傳回物件以外的物件,請改用 handle()方法。handle() 方法接受與 相同的參數whenComplete(),但您可以處理結果並傳回物件。

若要等待非同步鏈結完成並擷取完成結果,您可以呼叫 join()方法。如果未在鏈中處理Throwable物件,則 join()方法會擲回未勾選的 CompletionException,以包裝原始例外狀況。您可以使用 存取原始例外狀況CompletionException#getCause()。您也可以呼叫 CompletableFuture#get()方法來取得完成結果。不過, get()方法可能會擲回已檢查的例外狀況。

下列範例顯示兩種不同的 ,說明如何使用 DynamoDB 非同步用戶端的 listTables()方法。傳遞給 的動作whenComplete()只會記錄成功的回應,而 handle()版本會擷取資料表名稱清單並傳回清單。在這兩種情況下,如果在非同步鏈中產生錯誤,則會重新產生錯誤,以便用戶端程式碼有機會處理它。

匯入

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;

Code

whenComplete() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { ListTablesResponse listTablesResponse = listTablesWhenComplete(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. if (listTablesResponse.hasTableNames()){ System.out.println("Table exist in this region: " + region.id()); } } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<ListTablesResponse> listTablesWhenComplete(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .whenComplete((listTablesResponse, throwable) -> { if (listTablesResponse != null) { // Consume the response. System.out.println("The SDK's listTables method completed successfully."); } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); }
handle() variation
public class DynamoDbAsyncListTables { public static void main(String[] args) { Region region = Region.US_EAST_1; DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); try { List<String> tableNames = listTablesHandle(dynamoDbAsyncClient).join(); // The join() method may throw a CompletionException. tableNames.forEach(System.out::println); } catch (RuntimeException e) { // Handle as needed. Here we simply print out the class names. System.out.println(e.getClass()); // Prints 'class java.util.concurrent.CompletionException'. System.out.println(e.getCause().getClass()); // Prints 'class software.amazon.awssdk.services.dynamodb.model.DynamoDbException'. } } public static CompletableFuture<List<String>> listTablesHandle(DynamoDbAsyncClient client) { return client.listTables(ListTablesRequest.builder().build()) .handle((listTablesResponse, throwable) -> { if (listTablesResponse != null) { return listTablesResponse.tableNames(); // Return the list of table names. } else { RuntimeException cause = (RuntimeException) throwable.getCause(); // If an error was thrown during the SDK's listTables method it is wrapped in a CompletionException. // The SDK throws only RuntimeExceptions, so this is a safe cast. System.out.println(cause.getMessage()); // Log error here, but rethrow so the calling code can handle as needed. throw cause; } }); } }

以非同步方法處理串流

對於具有串流內容的非同步方法,您必須提供 AsyncRequestBody 以遞增方式提供內容,或提供 AsyncResponseTransformer 以接收和處理回應。

下列範例會使用 Amazon S3 PutObject操作的非同步形式,以非同步方式將檔案上傳至 。

匯入

import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

Code

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }

下列範例使用 GetObject操作的非同步形式 Amazon S3 從 取得檔案。

匯入

import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;

Code

/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }

設定進階非同步選項

AWS SDK for Java 2.x 使用非同步事件驅動型網路應用程式架構 Netty 來處理 I/O 執行緒。 AWS SDK for Java 2.x 會建立 Netty ExecutorService後方的 ,以完成從 HTTP 用戶端請求傳回到 Netty 用戶端的未來。如果開發人員選擇停止或睡眠執行緒,則此抽象概念可降低應用程式中斷非同步程序的風險。根據預設,每個非同步用戶端都會根據處理器數量建立執行緒集區,並管理 內佇列中的任務ExecutorService

您可以在建置非同步用戶端ExecutorService時指定 的特定 JDK 實作。下列程式碼片段會建立ExecutorService具有固定執行緒數目的 。

Code

S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();

若要最佳化效能,您可以管理自己的執行緒集區執行器,並在設定用戶端時包含它。

ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();