有时我们的Durable functions需要等待外部事件;例如人工审批、第三方 API 响应或异步流程的完成。Callbacks 让我们的函数暂停执行并等待外部系统响应,而无需在等待期间消耗计算资源。
考虑以下场景:
没有 callbacks,我们需要反复轮询,浪费计算时间和费用。有了 callbacks,我们的函数会暂停,不产生任何费用,直到外部系统响应。
Callback 流程包含两个部分:
callback_idcallback_id 发送给外部系统
使用 callbacks 有两种方式:
create_callback() 和 callback.result()这种方式让我们对 callback 生命周期有明确的控制。有三个关键步骤:
callback.result() 暂停持久执行,直到发送 callback 响应from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from mock_services import send_approval_request
@durable_execution
def handler(event, context: DurableContext):
# 1. Create the callback with a timeout
callback = context.create_callback(
name="approval-callback",
config=CallbackConfig(timeout=Duration.from_hours(24)),
)
# 2. Send callback ID to external system
send_approval_request(
approver_email=event["approver_email"],
callback_id=callback.callback_id,
request_details=event["details"],
)
# 3. Wait for result - execution suspends here
approval_result = callback.result()
# Parse JSON string result
if isinstance(approval_result, str):
approval_result = json.loads(approval_result)
# Execution resumes when callback is notified
if approval_result and approval_result.get("approved"):
return {"status": "approved"}
else:
return {"status": "rejected", "reason": approval_result.get("reason")}
wait_for_callback()使用 wait_for_callback() 通过 ID 等待 callback:
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from mock_services import send_approval_request
@durable_execution
def handler(event, context: DurableContext):
# Create callback first
callback = context.create_callback(name="approval-callback")
# Send callback ID to external system
send_approval_request(
approver_email=event["approver_email"],
callback_id=callback.callback_id,
request_details=event["details"],
)
# Wait for the callback by ID
approval_result = context.wait_for_callback(
callback.callback_id,
config=CallbackConfig(timeout=Duration.from_hours(24)),
)
# Parse JSON string result
if isinstance(approval_result, str):
approval_result = json.loads(approval_result)
if approval_result and approval_result.get("approved"):
return {"status": "approved"}
else:
return {"status": "rejected"}
callback.result() 如何暂停执行
result = callback.result()
这是执行完全暂停的关键行。当我们的函数到达这一行时:
外部系统可以使用 AWS SDK 或 CLI 向 callback 发送成功、失败或心跳响应。
当外部操作成功时,使用 AWS CLI:
aws lambda send-durable-execution-callback-success \
--callback-id "YOUR_CALLBACK_ID" \
--result '{"approved": true, "approver": "user@example.com"}'
或使用 AWS SDK(Python):
import boto3
import json
lambda_client = boto3.client('lambda')
result_data = json.dumps({
"approved": True,
"approver": "user@example.com",
}).encode('utf-8')
lambda_client.send_durable_execution_callback_success(
CallbackId=callback_id,
Result=result_data
)
当外部操作失败时:
aws lambda send-durable-execution-callback-failure \
--callback-id "YOUR_CALLBACK_ID" \
--error '{"ErrorType": "ApprovalDenied", "ErrorMessage": "Request was denied"}'
对于长时间运行的操作,发送心跳以防止超时:
aws lambda send-durable-execution-callback-heartbeat \
--callback-id "YOUR_CALLBACK_ID"
对于长时间运行的外部操作,心跳信号表明外部系统仍在积极处理请求。没有心跳,Lambda 会认为外部系统已失败或无响应,并使 callback 超时。这可以防止工作流无限期地等待永远不会到来的响应。
将心跳视为"保活"信号——外部系统定期发送心跳,表示"我还在处理这个,请不要放弃我。”
Callbacks 支持两种类型的超时:
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
config = CallbackConfig(
# Maximum total wait time
timeout=Duration.from_hours(48),
# Maximum time between heartbeats (optional)
heartbeat_timeout=Duration.from_hours(12),
)
callback.result() 返回 None。示例场景: 我们设置 timeout=48 hours 和 heartbeat_timeout=12 hours。外部系统必须至少每 12 小时发送一次心跳以保持 callback 活跃。如果 12 小时内没有心跳,callback 将超时——即使 48 小时总超时中只过去了 24 小时。
更新lambda代码如下,并进行deploy:
import json
from aws_durable_execution_sdk_python import DurableContext, durable_execution
from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
def send_review_request(callback_id: str):
"""Sends review request to reviewer (implementation not shown)."""
print(f"Sending review request with callback ID: {callback_id}")
@durable_execution
def lambda_handler(event, context: DurableContext):
# Create a callback with a 72-hour timeout
callback = context.create_callback(
name="document-review",
config=CallbackConfig(timeout=Duration.from_hours(72)),
)
# Send the callback ID
send_review_request(callback.callback_id)
# Wait for the review result
result = callback.result()
# Response handling (already implemented)
if result is None:
return {"status": "timeout"}
# Parse JSON string result
if isinstance(result, str):
result = json.loads(result)
if result.get("approved"):
return {"status": "approved", "reviewer": result.get("reviewer")}
else:
return {"status": "rejected", "reason": result.get("reason")}
创建一个新的测试事件,进行测试:

在Durable Executions页面,这个执行状态变成running:

打开这个executions,发现它的SubType是Callback,并且状态是Started;选中它,在Actions里面,可以发送三种类似的动作:

这里选择发送Success,payload如下:

发送完成后,这个executions执行成功:

在最后一步包含我们发送的数据:
