有时我们希望运行多个互不依赖的操作,并行运行它们比逐步依次运行每个步骤要快得多。
在本节中,我们将使用 context.parallel() 在Durable functions中并发执行多个操作。
考虑这个按顺序检查三个服务的函数:
# /src/python/03-application-patterns/01-parallel/serial_handler.py
from aws_durable_execution_sdk_python import durable_execution, DurableContext
import time
def check_inventory():
time.sleep(1)
return {"service": "inventory", "status": "ok"}
def check_payment():
time.sleep(1)
return {"service": "payment", "status": "ok"}
def check_shipping():
time.sleep(1)
return {"service": "shipping", "status": "ok"}
@durable_execution
def lambda_handler(event, context: DurableContext):
inventory = context.step(lambda _: check_inventory(), name="check-inventory")
payment = context.step(lambda _: check_payment(), name="check-payment")
shipping = context.step(lambda _: check_shipping(), name="check-shipping")
return {"results": [inventory, payment, shipping]}
如果每次检查需要 1 秒,则总执行时间约为 3 秒。
现在与并行执行进行比较:
# /src/python/03-application-patterns/01-parallel/parallel_handler.py
from aws_durable_execution_sdk_python import durable_execution, DurableContext, BatchResult
@durable_execution
def lambda_handler(event, context: DurableContext):
def check_inventory(ctx: DurableContext):
return ctx.step(lambda _: {"service": "inventory", "status": "ok"}, name="inventory")
def check_payment(ctx: DurableContext):
return ctx.step(lambda _: {"service": "payment", "status": "ok"}, name="payment")
def check_shipping(ctx: DurableContext):
return ctx.step(lambda _: {"service": "shipping", "status": "ok"}, name="shipping")
result: BatchResult = context.parallel([check_inventory, check_payment, check_shipping])
return {"results": result.get_results()}
使用并行执行,所有三个函数并发运行。总执行时间约为 1 秒。
在 AWS Lambda 控制台中分别部署这两个函数并进行测试。
当我们调用 context.parallel() 时:
子上下文:每个函数接收自己的 DurableContext。这隔离了各分支之间的状态和检查点。在每个函数内,我们可以等待或对步骤进行检查点。
并发执行:所有函数同时执行。SDK 自动管理并发。
独立检查点:每个分支对自己的步骤进行检查点。如果一个分支失败,其他分支继续执行。
结果收集:结果被收集到 BatchResult 对象中,保留原始顺序。
result: BatchResult = context.parallel([func_a, func_b, func_c])
# Access results
result.get_results() # List of successful results, in order
result.success_count # Number of successes
result.failure_count # Number of failures
result.total_count # Total branches