Local activities
Local activities run directly in the workflow worker process, skipping the deployment's task queues. They're useful for fast operations (under 1 second) where scheduling overhead is significant.
When to use
Good for:
- Quick computations, validations, data transformations
- High-volume scenarios where latency matters
- Operations completing in under 1 second
Avoid for:
- Long-running operations (more than a few seconds)
- External API calls or I/O-heavy tasks
- Operations needing separate retry isolation or scaling
Basic usage
from mistralai.workflows import run_activities_locally
@workflows.activity()
async def validate_email(email: str) -> bool:
return "@" in email
@workflows.workflow.define(name="user-workflow")
class UserWorkflow:
@workflows.workflow.entrypoint
async def execute(self, email: str) -> bool:
with run_activities_locally():
result = await validate_email(email)
return resultMixed execution
Combine local and remote activities:
Performance impact
Local activities skip the queue and worker hop, so they typically run much faster than regular activities. Actual latency depends on infrastructure and workflow load.
| Execution mode | Latency profile | Best for |
|---|---|---|
| Local activity | Sub-millisecond to ms | Quick computations, lookups |
| Regular activity | Tens to hundreds of ms | API calls, complex logic |
Limitations
Worker crash risk: local activities run in the workflow worker process. If a local activity raises an unexpected error, it can crash the worker, affecting all workflows and activities running on it. Unlike regular activities, there is no process-level isolation. Reserve local activities for code you trust to be exception-safe.
Bypasses standard scheduling:
- No deployment-level isolation: local activities share resources with workflows.
- Worker process blocking: slow local activities block the workflow worker.
- No independent scaling: local activities cannot scale separately from the worker.
Activity settings (partial support):
Timeouts, retry policies, and dependency injection work the same for local activities. The following parameters do not apply because local activities already run in the workflow worker process: sticky_to_worker, rate_limit, heartbeat_timeout.
@workflows.activity(
start_to_close_timeout=timedelta(seconds=5),
retry_policy_max_attempts=3
)
async def timed_local_activity(data: str) -> str:
# Timeouts and retry policies apply even when running locally.
# Rate limiting is NOT supported for local activities.
return data.upper()