pip install aws-durable-execution-sdk-pythonPlease see CONTRIBUTING.md. It contains the testing guide, sample commands and instructions for how to contribute to this package.
tldr; use hatch and it will manage virtual envs and dependencies for you, so you don't have to do it manually.
The entry-point that consumers of the SDK interact with is the DurableContext.
- Core Methods:
set_logger,step,invoke,map,parallel,run_in_child_context,wait,create_callback,wait_for_callback,wait_for_condition - Thread Safety: Uses
OrderedCounterfor generating sequential step IDs - State Management: Delegates to
ExecutionStatefor checkpointing
- Map/Parallel: Both inherit from
ConcurrentExecutorabstract base class - Thread Pool: Uses
ThreadPoolExecutorfor concurrent execution - State Tracking:
ExecutableWithStatemanages individual task lifecycle - Completion Logic:
ExecutionCounterstracks success/failure criteria - Suspension:
TimerSchedulerhandles timed suspensions and resumptions
- Modular Configs: Separate config classes for each operation type
- Completion Control:
CompletionConfigdefines success/failure criteria - Serialization:
SerDesinterface for custom serialization
- Separation of Concerns: Each operation has dedicated handler function
- Checkpointing: All operations integrate with execution state checkpointing
- Error Handling: Consistent error handling and retry logic across operations
classDiagram
class DurableContext {
-ExecutionState state
-Any lambda_context
-str _parent_id
-OrderedCounter _step_counter
-LogInfo _log_info
-Logger logger
+set_logger(LoggerInterface new_logger)
+step(Callable func, str name, StepConfig config) T
+invoke(str function_name, P payload, str name, InvokeConfig config) R
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+run_in_child_context(Callable func, str name, ChildConfig config) T
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
+wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any
+wait_for_condition(Callable check, WaitForConditionConfig config, str name) T
}
class DurableContextProtocol {
<<interface>>
+step(Callable func, str name, StepConfig config) T
+run_in_child_context(Callable func, str name, ChildConfig config) T
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
}
class OrderedCounter {
-OrderedLock _lock
-int _counter
+increment() int
+decrement() int
+get_current() int
}
class ExecutionState {
+str durable_execution_arn
+get_checkpoint_result(str operation_id) CheckpointedResult
+create_checkpoint(OperationUpdate operation_update)
}
class Logger {
+LoggerInterface logger
+LogInfo info
+with_log_info(LogInfo info) Logger
+from_log_info(LoggerInterface logger, LogInfo info) Logger
}
DurableContext ..|> DurableContextProtocol : implements
DurableContext --> ExecutionState : uses
DurableContext --> OrderedCounter : contains
DurableContext --> Logger : contains
The DurableContext calls operation handlers, which contain the execution logic for each operation.
classDiagram
class DurableContext {
+step(Callable func, str name, StepConfig config) T
+invoke(str function_name, P payload, str name, InvokeConfig config) R
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+run_in_child_context(Callable func, str name, ChildConfig config) T
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
+wait_for_callback(Callable submitter, str name, WaitForCallbackConfig config) Any
+wait_for_condition(Callable check, WaitForConditionConfig config, str name) T
}
class step_handler {
<<function>>
+step_handler(Callable func, ExecutionState state, OperationIdentifier op_id, StepConfig config, Logger logger) T
}
class invoke_handler {
<<function>>
+invoke_handler(str function_name, P payload, ExecutionState state, OperationIdentifier op_id, InvokeConfig config) R
}
class map_handler {
<<function>>
+map_handler(Sequence items, Callable func, MapConfig config, ExecutionState state, Callable run_in_child_context) BatchResult
}
class parallel_handler {
<<function>>
+parallel_handler(Sequence callables, ParallelConfig config, ExecutionState state, Callable run_in_child_context) BatchResult
}
class child_handler {
<<function>>
+child_handler(Callable func, ExecutionState state, OperationIdentifier op_id, ChildConfig config) T
}
class wait_handler {
<<function>>
+wait_handler(int seconds, ExecutionState state, OperationIdentifier op_id)
}
class create_callback_handler {
<<function>>
+create_callback_handler(ExecutionState state, OperationIdentifier op_id, CallbackConfig config) str
}
class wait_for_callback_handler {
<<function>>
+wait_for_callback_handler(DurableContext context, Callable submitter, str name, WaitForCallbackConfig config) Any
}
class wait_for_condition_handler {
<<function>>
+wait_for_condition_handler(Callable check, WaitForConditionConfig config, ExecutionState state, OperationIdentifier op_id, Logger logger) T
}
DurableContext --> step_handler : calls
DurableContext --> invoke_handler : calls
DurableContext --> map_handler : calls
DurableContext --> parallel_handler : calls
DurableContext --> child_handler : calls
DurableContext --> wait_handler : calls
DurableContext --> create_callback_handler : calls
DurableContext --> wait_for_callback_handler : calls
DurableContext --> wait_for_condition_handler : calls
classDiagram
class StepConfig {
+Callable retry_strategy
+StepSemantics step_semantics
+SerDes serdes
}
class InvokeConfig~P,R~ {
+int timeout_seconds
+SerDes~P~ serdes_payload
+SerDes~R~ serdes_result
}
class MapConfig {
+int max_concurrency
+ItemBatcher item_batcher
+CompletionConfig completion_config
+SerDes serdes
}
class ParallelConfig {
+int max_concurrency
+CompletionConfig completion_config
+SerDes serdes
}
class ChildConfig~T~ {
+SerDes serdes
+OperationSubType sub_type
+Callable~T,str~ summary_generator
}
class CallbackConfig {
+int timeout_seconds
+int heartbeat_timeout_seconds
+SerDes serdes
}
class WaitForCallbackConfig {
+Callable retry_strategy
}
class WaitForConditionConfig~T~ {
+Callable wait_strategy
+T initial_state
+SerDes serdes
}
class CompletionConfig {
+int min_successful
+int tolerated_failure_count
+float tolerated_failure_percentage
+first_successful()$ CompletionConfig
+all_completed()$ CompletionConfig
+all_successful()$ CompletionConfig
}
class ItemBatcher~T~ {
+int max_items_per_batch
+float max_item_bytes_per_batch
+T batch_input
}
WaitForCallbackConfig --|> CallbackConfig : extends
MapConfig --> CompletionConfig : contains
MapConfig --> ItemBatcher : contains
ParallelConfig --> CompletionConfig : contains
classDiagram
class DurableContextProtocol {
<<interface>>
+step(Callable func, str name, StepConfig config) T
+run_in_child_context(Callable func, str name, ChildConfig config) T
+map(Sequence inputs, Callable func, str name, MapConfig config) BatchResult
+parallel(Sequence functions, str name, ParallelConfig config) BatchResult
+wait(int seconds, str name)
+create_callback(str name, CallbackConfig config) Callback
}
class LoggerInterface {
<<interface>>
+debug(object msg, *args, Mapping extra)
+info(object msg, *args, Mapping extra)
+warning(object msg, *args, Mapping extra)
+error(object msg, *args, Mapping extra)
+exception(object msg, *args, Mapping extra)
}
class CallbackProtocol~C_co~ {
<<interface>>
+str callback_id
+result() C_co
}
class BatchResultProtocol~T~ {
<<interface>>
+get_results() list~T~
}
class StepContext {
+LoggerInterface logger
}
class WaitForConditionCheckContext {
+LoggerInterface logger
}
class OperationContext {
+LoggerInterface logger
}
StepContext --|> OperationContext : extends
WaitForConditionCheckContext --|> OperationContext : extends
classDiagram
class SerDes~T~ {
<<abstract>>
+serialize(T value, SerDesContext context) str
+deserialize(str data, SerDesContext context) T
}
class JsonSerDes~T~ {
+serialize(T value, SerDesContext context) str
+deserialize(str data, SerDesContext context) T
}
class SerDesContext {
+str operation_id
+str durable_execution_arn
}
class serialize {
<<function>>
+serialize(SerDes serdes, T value, str operation_id, str durable_execution_arn) str
}
class deserialize {
<<function>>
+deserialize(SerDes serdes, str data, str operation_id, str durable_execution_arn) T
}
JsonSerDes ..|> SerDes : implements
serialize --> SerDes : uses
deserialize --> SerDes : uses
SerDes --> SerDesContext : uses
classDiagram
class ConcurrentExecutor~CallableType,ResultType~ {
<<abstract>>
+list~Executable~ executables
+int max_concurrency
+CompletionConfig completion_config
+ExecutionCounters counters
+list~ExecutableWithState~ executables_with_state
+Event _completion_event
+SuspendExecution _suspend_exception
+execute(ExecutionState state, Callable run_in_child_context) BatchResult~ResultType~
+execute_item(DurableContext child_context, Executable executable)* ResultType
+should_execution_suspend() SuspendResult
-_on_task_complete(ExecutableWithState exe_state, Future future, TimerScheduler scheduler)
-_create_result() BatchResult~ResultType~
}
class MapExecutor~T,R~ {
+Sequence~T~ items
+execute_item(DurableContext child_context, Executable executable) R
+from_items(Sequence items, Callable func, MapConfig config)$ MapExecutor
}
class ParallelExecutor {
+execute_item(DurableContext child_context, Executable executable) R
+from_callables(Sequence callables, ParallelConfig config)$ ParallelExecutor
}
class Executable~CallableType~ {
+int index
+CallableType func
}
class ExecutableWithState~CallableType,ResultType~ {
+Executable~CallableType~ executable
-BranchStatus _status
-Future _future
-float _suspend_until
-ResultType _result
-Exception _error
+run(Future future)
+suspend()
+suspend_with_timeout(float timestamp)
+complete(ResultType result)
+fail(Exception error)
+reset_to_pending()
+can_resume() bool
+is_running() bool
}
class ExecutionCounters {
+int total_tasks
+int min_successful
+int success_count
+int failure_count
-Lock _lock
+complete_task()
+fail_task()
+should_complete() bool
+is_all_completed() bool
+is_min_successful_reached() bool
+is_failure_tolerance_exceeded() bool
}
class TimerScheduler {
+Callable resubmit_callback
-list _pending_resumes
-Lock _lock
-Event _shutdown
-Thread _timer_thread
+schedule_resume(ExecutableWithState exe_state, float resume_time)
+shutdown()
-_timer_loop()
}
class BatchResult~R~ {
+list~BatchItem~R~~ all
+CompletionReason completion_reason
+succeeded() list~BatchItem~R~~
+failed() list~BatchItem~R~~
+get_results() list~R~
+throw_if_error()
}
class BatchItem~R~ {
+int index
+BatchItemStatus status
+R result
+ErrorObject error
}
MapExecutor --|> ConcurrentExecutor : extends
ParallelExecutor --|> ConcurrentExecutor : extends
ConcurrentExecutor --> ExecutableWithState : manages
ConcurrentExecutor --> ExecutionCounters : uses
ConcurrentExecutor --> TimerScheduler : uses
ConcurrentExecutor --> BatchResult : creates
ExecutableWithState --> Executable : contains
BatchResult --> BatchItem : contains
sequenceDiagram
participant DC as DurableContext
participant MH as map_handler
participant ME as MapExecutor
participant CE as ConcurrentExecutor
participant TP as ThreadPoolExecutor
participant TS as TimerScheduler
participant EC as ExecutionCounters
DC->>MH: map(inputs, func, config)
MH->>ME: MapExecutor.from_items()
ME->>CE: execute(state, run_in_child_context)
CE->>TP: ThreadPoolExecutor(max_workers)
CE->>TS: TimerScheduler(resubmitter)
CE->>EC: ExecutionCounters(total, min_successful)
loop For each executable
CE->>TP: submit_task(executable_with_state)
TP->>CE: execute_item_in_child_context()
CE->>DC: run_in_child_context(child_func)
DC->>ME: execute_item(child_context, executable)
end
par Task Completion Handling
TP->>CE: on_task_complete(future)
CE->>EC: complete_task() / fail_task()
CE->>CE: should_execution_suspend()
alt Should Complete
CE->>CE: _completion_event.set()
else Should Suspend
CE->>TS: schedule_resume(exe_state, timestamp)
end
end
CE->>CE: _completion_event.wait()
CE->>CE: _create_result()
CE->>DC: BatchResult
classDiagram
class OrderedLock {
-Lock _lock
-deque~Event~ _waiters
-bool _is_broken
-Exception _exception
+acquire() bool
+release()
+reset()
+is_broken() bool
+__enter__() OrderedLock
+__exit__(exc_type, exc_val, exc_tb)
}
class OrderedCounter {
-OrderedLock _lock
-int _counter
+increment() int
+decrement() int
+get_current() int
}
class Event {
<<threading.Event>>
+set()
+wait()
}
class Lock {
<<threading.Lock>>
+acquire()
+release()
}
OrderedCounter --> OrderedLock : uses
OrderedLock --> Lock : contains
OrderedLock --> Event : manages queue of
The SDK invokes the AWS Lambda checkpoint API to persist execution state. Checkpoints are batched for efficiency and can be either synchronous (blocking) or asynchronous (non-blocking). Critical checkpoints are blocking, meaning that execution will not proceed until the checkpoint call has successfully completed.
Checkpoints are categorized by their action (START, SUCCEED, FAIL) and whether they are critical to execution correctness:
| Operation Type | Action | Is Sync? | Rationale |
|---|---|---|---|
| Step (AtMostOncePerRetry) | START | Yes | Prevents duplicate execution - must wait for confirmation |
| Step (AtLeastOncePerRetry) | START | No | Performance optimization - idempotent operations can retry |
| Step | SUCCEED/FAIL | Yes | Ensures result persisted before returning to caller |
| Callback | START | Yes | Must wait for API to generate callback ID |
| Callback | SUCCEED/FAIL | Yes | Ensures callback result persisted |
| Invoke | START | Yes | Ensures chained invoke recorded before proceeding |
| Invoke | SUCCEED/FAIL | Yes | Ensures invoke result persisted |
| Context (Child) | START | No | Fire-and-forget for performance - parent tracks completion |
| Context (Child) | SUCCEED/FAIL | Yes | Ensures child result available to parent |
| Wait | START | No | Observability only - no blocking needed |
| Wait | SUCCEED | Yes | Ensures wait completion recorded |
| Wait for Condition | START | No | Observability only - condition check is idempotent |
| Wait for Condition | SUCCEED/FAIL | Yes | Ensures condition result persisted |
| Empty Checkpoint | N/A | Yes (default) | Refreshes checkpoint token and operations list |
Synchronous Checkpoints (is_sync=True, default):
- Block the caller until the checkpoint is processed by the background thread
- Ensure the checkpoint is persisted before continuing execution
- Safe default for correctness
- Used for critical operations where confirmation is required
Asynchronous Checkpoints (is_sync=False, opt-in):
- Return immediately without waiting for the checkpoint to complete
- Performance optimization for specific use cases
- Used for observability checkpoints and fire-and-forget operations
- Only safe when the operation is idempotent or non-critical
The SDK uses a background thread to batch multiple checkpoint operations into a single API call for efficiency. This reduces API overhead and improves throughput.
sequenceDiagram
participant MT as Main Thread
participant Q as Checkpoint Queue
participant BT as Background Thread
participant API as Durable Functions API
Note over MT,API: Synchronous Checkpoint Flow
MT->>Q: Enqueue operation + completion event
MT->>MT: Block on completion event
BT->>Q: Collect batch (up to 1 second or 750KB)
BT->>API: POST /checkpoint (batched operations)
API-->>BT: New checkpoint token + operations
BT->>BT: Update execution state
BT->>MT: Signal completion event
MT->>MT: Resume execution
Note over MT,API: Asynchronous Checkpoint Flow
MT->>Q: Enqueue operation (no event)
MT->>MT: Continue immediately
BT->>Q: Collect batch (up to 1 second or 750KB)
BT->>API: POST /checkpoint (batched operations)
API-->>BT: New checkpoint token + operations
BT->>BT: Update execution state
Checkpoint batching is controlled by CheckpointBatcherConfig:
@dataclass(frozen=True)
class CheckpointBatcherConfig:
max_batch_size_bytes: int = 750 * 1024 # 750KB
max_batch_time_seconds: float = 1.0 # 1 second
max_batch_operations: int | float = float("inf") # No limitThe background thread collects operations until one of these limits is reached:
- Batch size exceeds 750KB
- 1 second has elapsed since the first operation
- Maximum operation count is reached (unlimited by default)
The checkpointing system handles concurrent operations (map/parallel) by tracking parent-child relationships:
- When a CONTEXT operation completes (SUCCEED/FAIL), all descendant operations are marked as orphaned
- Orphaned operations are rejected if they attempt to checkpoint
- This prevents child operations from checkpointing after their parent has already completed
- Uses a single lock (
_parent_done_lock) to coordinate completion and checkpoint validation
When a checkpoint fails in the background thread:
- Error Signaling: The background thread creates a
BackgroundThreadErrorwrapping the original exception - Event Notification: All completion events (both in the current batch and queued operations) are signaled with this error
- Immediate Propagation: Synchronous callers waiting on
create_checkpoint(is_sync=True)immediately receive theBackgroundThreadError - Future Prevention: A failure event (
_checkpointing_failed) is set to prevent any future checkpoint attempts - Clean Termination: The background thread exits cleanly after signaling all waiting operations
For synchronous operations (default is_sync=True):
- The main thread receives
BackgroundThreadErrorimmediately when callingcreate_checkpoint() - This prevents further execution with corrupted state
For asynchronous operations (is_sync=False):
- The error is detected on the next synchronous checkpoint attempt
- The
_checkpointing_failedevent causes immediate failure before queuing
This ensures no code continues executing after a checkpoint failure, maintaining execution state integrity.
This project is licensed under the Apache-2.0 License.