Activities
An activity is where the actual work happens: sending an email, calling an external API, running a database query, processing a file, generating a response from an AI model. Anything that interacts with the outside world, performs I/O, or has side effects belongs in an activity, not in the workflow itself.
Activities are ordinary async functions. They have no constraints around determinism: they can call anything, use any library, read from disk, write to a database. The platform treats each activity call as an atomic unit. If an activity fails, it is retried automatically according to a configurable policy. Once it succeeds, its result is recorded in the execution history and the workflow moves on.
Because results are persisted, a successful activity is never re-executed on replay; only its recorded result is used. However, if an activity fails or times out mid-execution, the platform retries it, potentially after side effects were already partially applied.
This means activities must be safe to re-execute: a retry after a partial failure leaves the system in a consistent state. This is sometimes called idempotency, but the precise property is that side effects converge to the same observable state, not that outputs are byte-for-byte identical.
An LLM activity, for example, might return a different completion on retry, yet the activity is still safe to re-execute because its side effect (recording one completion) completes once per successful attempt.
Defining an activity
import mistralai.workflows as workflows
@workflows.activity()
async def fetch_user_data(user_id: str) -> dict:
response = await http_client.get(f"/users/{user_id}")
return response.json()Activities are called with await directly inside workflow code:
@workflows.workflow.define(name="user_report")
class UserReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, user_id: str) -> dict:
user = await fetch_user_data(user_id)
report = await generate_report(user)
return reportRetry policy
Configure retries and timeouts on the decorator. Always set start_to_close_timeout; without it, an unresponsive activity blocks indefinitely:
from datetime import timedelta
@workflows.activity(
start_to_close_timeout=timedelta(minutes=5),
retry_policy_max_attempts=3,
retry_policy_backoff_coefficient=2.0,
)
async def call_external_api(params: ApiParams) -> ApiResponse:
async with httpx.AsyncClient() as client:
response = await client.post(params.url, json=params.data)
return ApiResponse(data=response.json())Long-running activities: heartbeats
For activities that run for more than a few minutes, use heartbeats to signal that the activity is still alive. A heartbeat is a periodic ping the activity sends to the platform; it can also carry progress data so a retry can resume from the last checkpoint instead of starting over.
If the platform stops receiving heartbeats within the configured heartbeat_timeout, it considers the worker dead and reschedules the activity on another worker.
from datetime import timedelta
import temporalio.activity
@workflows.activity(
start_to_close_timeout=timedelta(minutes=30),
heartbeat_timeout=timedelta(seconds=30),
)
async def process_large_file(path: str) -> int:
processed = 0
async for chunk in read_chunks(path):
await do_work(chunk)
processed += 1
temporalio.activity.heartbeat(processed) # ping + checkpoint
return processedSee Building Workflows > Activities > Basics for full configuration options.
Limitations
Use async libraries for I/O. Blocking calls stall the worker for all concurrent activities:
# ✅ Async HTTP
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
# ❌ Blocking HTTP, stalls the worker
response = requests.post(url, json=data)2MB input/output limit. Each activity call is limited to 2MB in and 2MB out. See Payload offloading for larger payloads.