• Overview

Activities

An activity is where the actual work happens: sending an email, calling an external API, running a database query, processing a file, generating a response from an AI model. Anything that interacts with the outside world, performs I/O, or has side effects belongs in an activity, not in the workflow itself.

Activities are ordinary async functions. They have no constraints around determinism: they can call anything, use any library, read from disk, write to a database. The platform treats each activity call as an atomic unit. If an activity fails, it is retried automatically according to a configurable policy. Once it succeeds, its result is recorded in the execution history and the workflow moves on.

Because results are persisted, a successful activity is never re-executed on replay; only its recorded result is used. However, if an activity fails or times out mid-execution, the platform retries it, potentially after side effects were already partially applied.

This means activities must be safe to re-execute: a retry after a partial failure leaves the system in a consistent state. This is sometimes called idempotency, but the precise property is that side effects converge to the same observable state, not that outputs are byte-for-byte identical.

An LLM activity, for example, might return a different completion on retry, yet the activity is still safe to re-execute because its side effect (recording one completion) completes once per successful attempt.

Defining an activity

Defining an activity

import mistralai.workflows as workflows

@workflows.activity()
async def fetch_user_data(user_id: str) -> dict:
    response = await http_client.get(f"/users/{user_id}")
    return response.json()

Activities are called with await directly inside workflow code:

@workflows.workflow.define(name="user_report")
class UserReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, user_id: str) -> dict:
        user = await fetch_user_data(user_id)
        report = await generate_report(user)
        return report
Retry policy

Retry policy

Configure retries and timeouts on the decorator. Always set start_to_close_timeout; without it, an unresponsive activity blocks indefinitely:

from datetime import timedelta

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=5),
    retry_policy_max_attempts=3,
    retry_policy_backoff_coefficient=2.0,
)
async def call_external_api(params: ApiParams) -> ApiResponse:
    async with httpx.AsyncClient() as client:
        response = await client.post(params.url, json=params.data)
        return ApiResponse(data=response.json())
Long-running activities: heartbeats

Long-running activities: heartbeats

For activities that run for more than a few minutes, use heartbeats to signal that the activity is still alive. A heartbeat is a periodic ping the activity sends to the platform; it can also carry progress data so a retry can resume from the last checkpoint instead of starting over.

If the platform stops receiving heartbeats within the configured heartbeat_timeout, it considers the worker dead and reschedules the activity on another worker.

from datetime import timedelta
import temporalio.activity

@workflows.activity(
    start_to_close_timeout=timedelta(minutes=30),
    heartbeat_timeout=timedelta(seconds=30),
)
async def process_large_file(path: str) -> int:
    processed = 0
    async for chunk in read_chunks(path):
        await do_work(chunk)
        processed += 1
        temporalio.activity.heartbeat(processed)  # ping + checkpoint
    return processed

See Building Workflows > Activities > Basics for full configuration options.

Limitations

Limitations

Use async libraries for I/O. Blocking calls stall the worker for all concurrent activities:

# ✅ Async HTTP
async with httpx.AsyncClient() as client:
    response = await client.post(url, json=data)

# ❌ Blocking HTTP, stalls the worker
response = requests.post(url, json=data)

2MB input/output limit. Each activity call is limited to 2MB in and 2MB out. See Payload offloading for larger payloads.