Activity basics
Activities are the units of work in a workflow. For the conceptual overview (atomicity, retries, replay), see Core Concepts > Activities. This page covers how to define, configure, and structure them.
Defining an activity
Basic activity structure:
import mistralai.workflows as workflows
@workflows.activity()
async def my_activity(input_data: str, count: int = 1) -> dict:
"""Activity implementation"""
# Perform work here
return {"result": input_data, "processed_count": count}Core activity features
Activity naming
By default, an activity is registered using its Python function name. Use the name parameter to set a custom name; this is the identifier used for registration, execution routing, and what appears in traces and the console.
@workflows.activity(
name="custom_activity_name", # Used for registration, execution, and observability
)Learn more about observability and naming
Timeouts
start_to_close_timeout sets the maximum time an activity can run from the moment it starts executing to the moment it must return a result. If the activity exceeds this duration, it is terminated and treated as a failure (which might trigger a retry). Without a timeout, an unresponsive activity blocks indefinitely.
from datetime import timedelta
@workflows.activity(
start_to_close_timeout=timedelta(minutes=10)
)Retry policies
When an activity fails, it is retried automatically up to retry_policy_max_attempts times. The retry_policy_backoff_coefficient controls exponential backoff between retries: a coefficient of 2.0 means the delay doubles after each attempt (for example, 1s, 2s, 4s, 8s). After all retries are exhausted, the failure propagates to the workflow.
@workflows.activity(
retry_policy_max_attempts=5,
retry_policy_backoff_coefficient=2.0
)Worker stickiness
To pin a sequence of activities to the same worker (so they can share an in-memory resource like a loaded model), set sticky_to_worker=True and run them inside run_sticky_worker_session(). For the full pattern, see Sticky worker sessions.
Heartbeat timeout
Detect unresponsive activities by requiring periodic heartbeat signals:
from datetime import timedelta
from mistralai.workflows import activity
@workflows.activity(
start_to_close_timeout=timedelta(minutes=30),
heartbeat_timeout=timedelta(seconds=30)
)
async def long_running_task(items: list[str]) -> dict:
results = []
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Report progress to prevent timeout
activity.heartbeat({"processed": i + 1, "total": len(items)})
return {"results": results}When heartbeat_timeout is set, activities must call activity.heartbeat() periodically. If no heartbeat is received within the timeout, the activity is considered failed and a retry is triggered. This pattern lets the platform detect unresponsive activities without waiting for the full start_to_close_timeout.
This subsection is the canonical reference for heartbeats; both Core Concepts > Activities and Core Concepts > Workers link here.
Not supported for local activities.
Granularity and failure handling
We recommend keeping activities as granular as possible. Break complex tasks into smaller, manageable activities, with each one encapsulating all the logic that might fail. This approach has several benefits:
- Isolation of failures: smaller activities make it easier to isolate and handle failures. If a failure occurs, only the affected activity needs to be retried.
- Easier debugging: granular activities make it easier to identify the exact point of failure and debug issues.
- Better retry mechanisms: because each activity is idempotent and can be retried independently, granular activities ensure that only the failed part of the process is retried, saving time and resources.
For example, if you have a process that involves fetching data from an API, processing that data, and then storing it in a database, you might define three separate activities:
- Fetch data from API
- Process the data
- Store the data in the database
Nested activities
Retry boundary in nested activities: if an activity calls another activity, only the outer activity is treated as a retry boundary. State is checkpointed before the outer activity, not before the nested call. If the nested activity fails, the entire outer activity is retried from the beginning, including any work already done before the nested call. To give each step its own retry isolation, compose activities at the workflow level instead.
When you design activities, understand how state management works, especially when activities are nested within each other. If an activity is encapsulated within another activity, only the parent activity is considered for retries and state management. The state is saved before the nested activity, and only the parent activity is retried if it fails.
import mistralai.workflows as workflows
@workflows.activity()
async def parent_activity(input_data: str) -> dict:
"""Parent activity that encapsulates a nested activity."""
# Some logic here
nested_result = await nested_activity(input_data)
return {"result_data": nested_result["result_data"]}
@workflows.activity()
async def nested_activity(input_data: str) -> dict:
"""Nested activity."""
# Logic for the nested activity
return {"result_data": "processed_data"}In this example, if nested_activity fails, the entire parent_activity is retried, not just the nested activity. The state is saved before the nested activity is called.