Callbacks

有时我们的Durable functions需要等待外部事件;例如人工审批、第三方 API 响应或异步流程的完成。Callbacks 让我们的函数暂停执行并等待外部系统响应,而无需在等待期间消耗计算资源。

Callbacks 解决的问题

考虑以下场景:

  • 采购订单在处理前需要经理审批
  • 支付处理器通过 webhook 发送结果
  • 文档在继续处理前需要人工审核
  • 外部 API 异步处理请求

没有 callbacks,我们需要反复轮询,浪费计算时间和费用。有了 callbacks,我们的函数会暂停,不产生任何费用,直到外部系统响应。

Callbacks 的工作原理

Callback 流程包含两个部分:

  1. Durable functions创建一个 callback 并获取唯一的 callback_id
  2. Durable functionscallback_id 发送给外部系统
  3. Durable functions等待 callback,执行在此处暂停
  4. 外部进程(独立的 Lambda、webhook 处理器等)接收结果并使用 AWS SDK 通知 Lambda
  5. Durable functions携带结果恢复执行

image-20260301104300489

创建和等待 Callbacks

使用 callbacks 有两种方式:

1. 使用 create_callback()callback.result()

这种方式让我们对 callback 生命周期有明确的控制。有三个关键步骤:

  1. 创建 callback - 获取配置了超时时间的唯一 callback ID
  2. 发送 callback ID - 将其传递给我们的外部系统(电子邮件、API、队列等)
  3. 等待响应 - 调用 callback.result() 暂停持久执行,直到发送 callback 响应
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from mock_services import send_approval_request

@durable_execution
def handler(event, context: DurableContext):
    # 1. Create the callback with a timeout
    callback = context.create_callback(
        name="approval-callback",
        config=CallbackConfig(timeout=Duration.from_hours(24)),
    )
    
    # 2. Send callback ID to external system
    send_approval_request(
        approver_email=event["approver_email"],
        callback_id=callback.callback_id,
        request_details=event["details"],
    )
    
    # 3. Wait for result - execution suspends here
    approval_result = callback.result()
    
    # Parse JSON string result
    if isinstance(approval_result, str):
        approval_result = json.loads(approval_result)
    
    # Execution resumes when callback is notified
    if approval_result and approval_result.get("approved"):
        return {"status": "approved"}
    else:
        return {"status": "rejected", "reason": approval_result.get("reason")}

2. 使用 wait_for_callback()

使用 wait_for_callback() 通过 ID 等待 callback:

from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from mock_services import send_approval_request

@durable_execution
def handler(event, context: DurableContext):
    # Create callback first
    callback = context.create_callback(name="approval-callback")
    
    # Send callback ID to external system
    send_approval_request(
        approver_email=event["approver_email"],
        callback_id=callback.callback_id,
        request_details=event["details"],
    )
    
    # Wait for the callback by ID
    approval_result = context.wait_for_callback(
        callback.callback_id,
        config=CallbackConfig(timeout=Duration.from_hours(24)),
    )
    
    # Parse JSON string result
    if isinstance(approval_result, str):
        approval_result = json.loads(approval_result)

    if approval_result and approval_result.get("approved"):
        return {"status": "approved"}
    else:
        return {"status": "rejected"}

callback.result() 如何暂停执行

result = callback.result()

这是执行完全暂停的关键行。当我们的函数到达这一行时:

  • Lambda 调用结束
  • 等待期间不产生计算费用
  • 工作流状态保存在检查点日志中
  • 当外部系统使用 AWS SDK 发送 callback 响应时,AWS Lambda 将自动恢复执行

响应 Callbacks

外部系统可以使用 AWS SDK 或 CLI 向 callback 发送成功、失败或心跳响应。

发送成功响应

当外部操作成功时,使用 AWS CLI:

aws lambda send-durable-execution-callback-success \
    --callback-id "YOUR_CALLBACK_ID" \
    --result '{"approved": true, "approver": "user@example.com"}'

或使用 AWS SDK(Python):

import boto3
import json

lambda_client = boto3.client('lambda')

result_data = json.dumps({
    "approved": True,
    "approver": "user@example.com",
}).encode('utf-8')

lambda_client.send_durable_execution_callback_success(
    CallbackId=callback_id,
    Result=result_data
)

发送失败响应

当外部操作失败时:

aws lambda send-durable-execution-callback-failure \
    --callback-id "YOUR_CALLBACK_ID" \
    --error '{"ErrorType": "ApprovalDenied", "ErrorMessage": "Request was denied"}'

发送心跳

对于长时间运行的操作,发送心跳以防止超时:

aws lambda send-durable-execution-callback-heartbeat \
    --callback-id "YOUR_CALLBACK_ID"

理解心跳

对于长时间运行的外部操作,心跳信号表明外部系统仍在积极处理请求。没有心跳,Lambda 会认为外部系统已失败或无响应,并使 callback 超时。这可以防止工作流无限期地等待永远不会到来的响应。

将心跳视为"保活"信号——外部系统定期发送心跳,表示"我还在处理这个,请不要放弃我。”

配置超时

Callbacks 支持两种类型的超时:

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration

config = CallbackConfig(
    # Maximum total wait time
    timeout=Duration.from_hours(48),
    
    # Maximum time between heartbeats (optional)
    heartbeat_timeout=Duration.from_hours(12),
)
  • timeout:等待响应的最长时间。如果超过,callback.result() 返回 None
  • heartbeat_timeout:心跳信号之间的最长时间。使用此选项检测外部系统何时停止响应。如果外部系统在此期间内未发送心跳,即使总体超时尚未到达,callback 也会超时。

示例场景: 我们设置 timeout=48 hoursheartbeat_timeout=12 hours。外部系统必须至少每 12 小时发送一次心跳以保持 callback 活跃。如果 12 小时内没有心跳,callback 将超时——即使 48 小时总超时中只过去了 24 小时。

测试Callback

更新lambda代码如下,并进行deploy:

import json
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration

def send_review_request(callback_id: str):
    """Sends review request to reviewer (implementation not shown)."""
    print(f"Sending review request with callback ID: {callback_id}")

@durable_execution
def lambda_handler(event, context: DurableContext):
    # Create a callback with a 72-hour timeout
    callback = context.create_callback(
        name="document-review",
        config=CallbackConfig(timeout=Duration.from_hours(72)),
    )
    
    # Send the callback ID
    send_review_request(callback.callback_id)
    
    # Wait for the review result
    result = callback.result()
    
    # Response handling (already implemented)
    if result is None:
        return {"status": "timeout"}
    
    # Parse JSON string result
    if isinstance(result, str):
        result = json.loads(result)
    
    if result.get("approved"):
        return {"status": "approved", "reviewer": result.get("reviewer")}
    else:
        return {"status": "rejected", "reason": result.get("reason")}

创建一个新的测试事件,进行测试:

image-20260301111336060

Durable Executions页面,这个执行状态变成running:

image-20260301111555716

打开这个executions,发现它的SubType是Callback,并且状态是Started;选中它,在Actions里面,可以发送三种类似的动作:

image-20260301111628995

这里选择发送Success,payload如下:

image-20260301111746716

发送完成后,这个executions执行成功:

image-20260301111818126

在最后一步包含我们发送的数据:

image-20260301111829599