现代应用程序经常需要与异步运行的外部系统进行协调。无论是等待数据处理作业完成、监控文件上传,还是检查第三方 API 操作的状态,轮询都是分布式系统中的基本模式。
以下是一个轮询工作流示例:

没有Durable Functions,实现可靠的轮询通常需要:
持久化函数通过以下方式解决这些挑战:
context.wait_for_condition() 在轮询时暂停执行,直到满足某个条件。当我们需要等待外部系统达到特定状态时,这非常有用。例如,如果我们启动了一个异步作业,需要等待其完成后再继续执行。
我们需要提供:
TrueSDK 负责处理轮询循环:调用我们的检查函数,评估条件,如果未满足,则按等待策略指定的时长暂停执行。每次轮询都会进行检查点保存,因此函数可以在超时和故障后恢复。
我们正在构建一个文档处理系统。当用户上传文档时,我们的持久化函数将其提交给外部 OCR 服务进行文本提取。OCR 服务异步处理文档,我们需要轮询以获取完成状态。在本挑战中,我们将模拟一个偶尔会出现故障的 OCR 服务。
Requirements:
lambda函数内容如下:
# /src/python/03-application-patterns/02-polling/handler.py
import random
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.waits import (
WaitForConditionConfig,
WaitForConditionDecision,
)
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
def check_status(state, check_context):
if random.random() < 0.80:
status = "complete"
else:
status = "incomplete"
return {"job_id": state["job_id"], "status": status}
def wait_strategy(state: dict, attempts: int) -> WaitForConditionDecision:
if state.get("status") != "incomplete":
return WaitForConditionDecision.stop_polling()
if attempts >= 3:
return WaitForConditionDecision.stop_polling()
# Exponential backoff: 2s, 4s, 8s
delay = 2 ** attempts
return WaitForConditionDecision.continue_waiting(Duration.from_seconds(delay))
result = context.wait_for_condition(
check=check_status,
config=WaitForConditionConfig(
initial_state={"job_id": 1, "status": "incomplete"},
wait_strategy=wait_strategy,
),
)
return result
WaitForConditionDecision.stop_polling() 告知 SDK 停止轮询
WaitForConditionDecision.continue_waiting(Duration.from_seconds(n)) 告知 SDK 在下次轮询前等待 n 秒
对于指数退避,思考 2 ** attempts 的增长方式:2、4、8…
attempts 参数告诉我们目前已进行了多少次轮询尝试
使用 context.wait_for_condition() 暂停指定时长,然后轮询直到满足条件,由 SDK 管理轮询循环。