Consuming Streaming Events
As a workflow runs, it emits events — execution started, activity completed, tokens generated. Consuming those events means your code listens to that stream and reacts to each one in real time. There are two ways to do this:
- SDK client (recommended): typed Pydantic models, automatic reconnection helpers, supports filtering and resume-from-sequence. Use this for application code.
- Raw HTTP endpoints: two SSE routes —
/v1/workflows/executions/{execution_id}/stream(subscribe to a single execution) and/v1/workflows/events/stream(filter across executions). Use these from non-Python clients or when you need to consume the raw SSE stream directly. See API Routes below.
Using the Workflows API Client
Basic Consumption
import os
from mistralai.client import Mistral
client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])
# Start a workflow
execution = client.workflows.execute_workflow(
workflow_identifier="my-workflow",
input=params,
)
# Stream all events for this execution
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution.execution_id,
):
print(f"[{event.stream}] {event.data}")Detecting Completion
Use SDK event types to detect when a workflow finishes:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
):
if event.data is None:
continue
event_type = event.data.event_type
if event_type == "WORKFLOW_EXECUTION_COMPLETED":
print("Done!")
break
elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
print(f"Ended: {event_type}")
break
else:
record_event(event)Resume from Sequence
If your connection drops, resume from where you left off using start_seq. broker_sequence is guaranteed to be monotonically increasing and never reused — see Sequence Guarantee for what you can rely on. Passing start_seq=N resumes delivery from sequence N inclusive, so advance past the last received sequence to avoid re-delivering the same event.
last_seq = 0
while True:
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=execution_id,
start_seq=last_seq,
):
last_seq = event.broker_sequence + 1 # advance past this event
process(event)
break # Completed normally
except ConnectionError:
await asyncio.sleep(1)
# Loop will resume from last_seqAPI Routes
The platform exposes two SSE endpoints for consuming events. Pick based on whether you already know the execution you want to subscribe to.
Per-Execution Stream
GET /v1/workflows/executions/{execution_id}/streamSubscribe to a single execution by ID. Supports resume from a previous SSE position.
| Parameter | Description |
|---|---|
event_source | Optional filter by event source (e.g. workflow, activity). See Event types for available values. |
last_event_id | Resume after a specific sequence number (maps to start_seq). Can also be sent as a Last-Event-ID HTTP header — standard SSE resume mechanism. |
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/executions/${EXEC_ID}/stream"Filtered Events Stream
GET /v1/workflows/events/streamSubscribe across executions with rich filtering — useful for dashboards, observability tools, or consuming a parent workflow plus all of its descendants in one stream.
| Parameter | Description | Example |
|---|---|---|
scope | workflow, activity, or * | ?scope=activity |
stream | Filter by stream name | ?stream=token |
workflow_name | Filter by workflow name | ?workflow_name=my-workflow |
workflow_exec_id | Filter by execution ID | ?workflow_exec_id=exec789 |
root_workflow_exec_id | Filter by root workflow exec ID | ?root_workflow_exec_id=root123 |
parent_workflow_exec_id | Filter by parent workflow exec ID | ?parent_workflow_exec_id=parent456 |
activity_name | Filter by activity name | ?activity_name=chat_activity |
activity_id | Filter by activity execution ID | ?activity_id=activity123 |
start_seq | Resume from sequence | ?start_seq=42 |
metadata_filters | Filter by metadata (JSON object) | ?metadata_filters={"key":"v"} |
workflow_event_types | Filter by workflow event types | ?workflow_event_types=WORKFLOW_EXECUTION_COMPLETED |
# Token-stream events for a specific execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&stream=token"
# Only activity events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=activity"
# Only workflow-level events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
"https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=workflow"Response (Server-Sent Events):
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}
data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}Consuming events from workflow trees
When a workflow spawns child workflows, every event carries workflow_context with workflow_exec_id, parent_workflow_exec_id, and root_workflow_exec_id. The parent workflow uses execute_workflow to spawn a child:
@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params):
# Child workflow events will have parent_workflow_exec_id set to this execution
result = await workflows.workflow.execute_workflow(
ChildWorkflow,
params,
)
return resultTo receive events from a parent workflow and all of its descendants in a single stream, subscribe using root_workflow_exec_id:
# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
root_workflow_exec_id=parent_execution_id,
):
if event.workflow_context.parent_workflow_exec_id:
print(f"[child] {event.data}")
else:
print(f"[parent] {event.data}")Event types
The SDK uses typed event responses discriminated by event_type. Each event is a Pydantic model
(e.g. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) with fields like
event_id, event_timestamp, workflow_exec_id, workflow_name, and type-specific attributes.
For the canonical list of workflow and activity event types, see Core Concepts > Events > Event types. The streaming API additionally surfaces CUSTOM_TASK_* events emitted from inside activities (started, in-progress, completed, failed, timed out, canceled). Use these to publish progress updates from long-running activities; see Streaming for how to publish them.
For convenience, here are the event types you'll encounter most often when streaming:
| Event Type | Description |
|---|---|
WORKFLOW_EXECUTION_STARTED | Workflow started |
WORKFLOW_EXECUTION_COMPLETED | Workflow finished successfully |
WORKFLOW_EXECUTION_FAILED | Workflow failed |
WORKFLOW_EXECUTION_CANCELED | Workflow was canceled |
WORKFLOW_EXECUTION_CONTINUED_AS_NEW | Workflow continued as new |
WORKFLOW_TASK_TIMED_OUT | Workflow task timed out |
WORKFLOW_TASK_FAILED | Workflow task failed |
CUSTOM_TASK_STARTED | Custom task started |
CUSTOM_TASK_IN_PROGRESS | Custom task progress update |
CUSTOM_TASK_COMPLETED | Custom task completed |
CUSTOM_TASK_FAILED | Custom task failed |
CUSTOM_TASK_TIMED_OUT | Custom task timed out |
CUSTOM_TASK_CANCELED | Custom task canceled |
ACTIVITY_TASK_STARTED | Activity started |
ACTIVITY_TASK_COMPLETED | Activity finished |
ACTIVITY_TASK_RETRYING | Activity is retrying |
ACTIVITY_TASK_FAILED | Activity failed |
Example Event
{
"stream": "token",
"broker_sequence": 1,
"timestamp": "2025-01-15T10:30:00Z",
"data": {
"event_type": "WORKFLOW_EXECUTION_STARTED",
"event_id": "evt_abc123",
"event_timestamp": 1736938200000000000,
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "run_456",
"attributes": {}
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}Best practices
Always Handle Disconnection
Streaming connections can drop at any time. Use start_seq to resume from the last received event, and implement exponential backoff to avoid hammering the server. The is_terminal_status() check below is your own helper — check the event type against terminal statuses like WORKFLOW_EXECUTION_COMPLETED, WORKFLOW_EXECUTION_FAILED, or WORKFLOW_EXECUTION_CANCELED.
async def resilient_consume(client, exec_id):
max_retries = 10
last_seq = 0
last_seq = 0
for attempt in range(max_retries):
try:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id,
start_seq=last_seq,
):
last_seq = event.broker_sequence + 1 # advance past this event
yield event
if is_terminal_status(event):
return
except ConnectionError:
await asyncio.sleep(min(2 ** attempt, 30))Filter Early
Filter at the subscription level, not in your code:
# Good: Filter at source
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id, stream="token"
):
process(event)
# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id
):
if event.stream == "token":
process(event)Use Appropriate Scope
If you only care about activity events:
async for event in client.workflows.events.get_stream_events(
workflow_exec_id=exec_id, scope="activity"
):
if event.data is not None:
# Only activity events, no workflow-level events
process(event.data)