Job Polling

现代应用程序经常需要与异步运行的外部系统进行协调。无论是等待数据处理作业完成、监控文件上传,还是检查第三方 API 操作的状态,轮询都是分布式系统中的基本模式。

常见轮询场景

  • Data Processing:轮询 ETL 作业、ML 模型训练或批处理操作
  • File Processing:监控文档转换、图像缩放或视频转码
  • Infrastructure Operations:等待数据库迁移、部署或扩展操作

以下是一个轮询工作流示例:

image-20260301102833338

传统轮询的挑战

没有Durable Functions,实现可靠的轮询通常需要:

  • Complex State Management - 跨调用追踪轮询次数、超时和重试逻辑
  • Resource Waste - 在等待期间保持 Lambda 函数运行
  • Error Handling - 管理失败、指数退避和熔断器
  • Scalability Issues - 在不压垮外部系统的情况下协调多个轮询操作

持久化函数轮询的优势

持久化函数通过以下方式解决这些挑战:

  • Automatic State Persistence - 轮询次数、时间戳和结果自动进行检查点保存
  • Cost-Effective Waiting - 轮询间等待期间不产生计算费用
  • Built-in Retry Logic - 内置指数退避和故障处理
  • Scalable Coordination - 高效管理数千个并发轮询操作

如何在 Durable Execution SDK 中编写轮询逻辑?

context.wait_for_condition() 在轮询时暂停执行,直到满足某个条件。当我们需要等待外部系统达到特定状态时,这非常有用。例如,如果我们启动了一个异步作业,需要等待其完成后再继续执行。

我们需要提供:

  • 一个 check function,接收当前状态和检查上下文,查询外部系统,并返回更新后的状态对象
  • 一个 condition,接收状态并在应停止轮询时返回 True
  • 一个 wait strategy,控制轮询调用之间的等待时长

SDK 负责处理轮询循环:调用我们的检查函数,评估条件,如果未满足,则按等待策略指定的时长暂停执行。每次轮询都会进行检查点保存,因此函数可以在超时和故障后恢复。

背景

我们正在构建一个文档处理系统。当用户上传文档时,我们的持久化函数将其提交给外部 OCR 服务进行文本提取。OCR 服务异步处理文档,我们需要轮询以获取完成状态。在本挑战中,我们将模拟一个偶尔会出现故障的 OCR 服务。

Requirements:

  • OCR 服务通常在 10 秒内完成,但偶尔会出现错误。
  • 我们希望将轮询限制在 3 次,以避免过多的 API 调用
  • 使用指数退避(2s、4s、8s)

lambda函数内容如下:

# /src/python/03-application-patterns/02-polling/handler.py
import random
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.waits import (
    WaitForConditionConfig,
    WaitForConditionDecision,
)

@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
    def check_status(state, check_context):
        if random.random() < 0.80:
            status = "complete"
        else:
            status = "incomplete"
        return {"job_id": state["job_id"], "status": status}

    def wait_strategy(state: dict, attempts: int) -> WaitForConditionDecision:
        if state.get("status") != "incomplete":
            return WaitForConditionDecision.stop_polling()
        
        if attempts >= 3:
            return WaitForConditionDecision.stop_polling()
        
        # Exponential backoff: 2s, 4s, 8s
        delay = 2 ** attempts
        return WaitForConditionDecision.continue_waiting(Duration.from_seconds(delay))

    result = context.wait_for_condition(
        check=check_status,
        config=WaitForConditionConfig(
            initial_state={"job_id": 1, "status": "incomplete"},
            wait_strategy=wait_strategy,
        ),
    )

    return result
  • WaitForConditionDecision.stop_polling() 告知 SDK 停止轮询

  • WaitForConditionDecision.continue_waiting(Duration.from_seconds(n)) 告知 SDK 在下次轮询前等待 n

  • 对于指数退避,思考 2 ** attempts 的增长方式:2、4、8…

  • attempts 参数告诉我们目前已进行了多少次轮询尝试

  • 使用 context.wait_for_condition() 暂停指定时长,然后轮询直到满足条件,由 SDK 管理轮询循环。