Consuming Streaming Events

As a workflow runs, it emits events — execution started, activity completed, tokens generated. Consuming those events means your code listens to that stream and reacts to each one in real time. There are two ways to do this:

  • SDK client (recommended): typed Pydantic models, automatic reconnection helpers, supports filtering and resume-from-sequence. Use this for application code.
  • Raw HTTP endpoints: two SSE routes — /v1/workflows/executions/{execution_id}/stream (subscribe to a single execution) and /v1/workflows/events/stream (filter across executions). Use these from non-Python clients or when you need to consume the raw SSE stream directly. See API Routes below.
Using the Workflows API Client

Using the Workflows API Client

Basic Consumption

Basic Consumption

import os
from mistralai.client import Mistral

client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

# Start a workflow
execution = client.workflows.execute_workflow(
    workflow_identifier="my-workflow",
    input=params,
)

# Stream all events for this execution
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution.execution_id,
):
    print(f"[{event.stream}] {event.data}")
Detecting Completion

Detecting Completion

Use SDK event types to detect when a workflow finishes:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=execution_id,
):
    if event.data is None:
        continue

    event_type = event.data.event_type

    if event_type == "WORKFLOW_EXECUTION_COMPLETED":
        print("Done!")
        break
    elif event_type in ("WORKFLOW_EXECUTION_FAILED", "WORKFLOW_EXECUTION_CANCELED"):
        print(f"Ended: {event_type}")
        break
    else:
        record_event(event)
Resume from Sequence

Resume from Sequence

If your connection drops, resume from where you left off using start_seq. broker_sequence is guaranteed to be monotonically increasing and never reused — see Sequence Guarantee for what you can rely on. Passing start_seq=N resumes delivery from sequence N inclusive, so advance past the last received sequence to avoid re-delivering the same event.

last_seq = 0
while True:
    try:
        async for event in client.workflows.events.get_stream_events(
            workflow_exec_id=execution_id,
            start_seq=last_seq,
        ):
            last_seq = event.broker_sequence + 1  # advance past this event
            process(event)
        break  # Completed normally
    except ConnectionError:
        await asyncio.sleep(1)
        # Loop will resume from last_seq
API Routes

API Routes

The platform exposes two SSE endpoints for consuming events. Pick based on whether you already know the execution you want to subscribe to.

Per-Execution Stream

Per-Execution Stream

GET /v1/workflows/executions/{execution_id}/stream

Subscribe to a single execution by ID. Supports resume from a previous SSE position.

ParameterDescription
event_sourceOptional filter by event source (e.g. workflow, activity). See Event types for available values.
last_event_idResume after a specific sequence number (maps to start_seq). Can also be sent as a Last-Event-ID HTTP header — standard SSE resume mechanism.
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/executions/${EXEC_ID}/stream"
Filtered Events Stream

Filtered Events Stream

GET /v1/workflows/events/stream

Subscribe across executions with rich filtering — useful for dashboards, observability tools, or consuming a parent workflow plus all of its descendants in one stream.

ParameterDescriptionExample
scopeworkflow, activity, or *?scope=activity
streamFilter by stream name?stream=token
workflow_nameFilter by workflow name?workflow_name=my-workflow
workflow_exec_idFilter by execution ID?workflow_exec_id=exec789
root_workflow_exec_idFilter by root workflow exec ID?root_workflow_exec_id=root123
parent_workflow_exec_idFilter by parent workflow exec ID?parent_workflow_exec_id=parent456
activity_nameFilter by activity name?activity_name=chat_activity
activity_idFilter by activity execution ID?activity_id=activity123
start_seqResume from sequence?start_seq=42
metadata_filtersFilter by metadata (JSON object)?metadata_filters={"key":"v"}
workflow_event_typesFilter by workflow event types?workflow_event_types=WORKFLOW_EXECUTION_COMPLETED
# Token-stream events for a specific execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&stream=token"

# Only activity events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=activity"

# Only workflow-level events for an execution
curl -N -H "Authorization: Bearer $MISTRAL_API_KEY" \
  "https://api.mistral.ai/v1/workflows/events/stream?workflow_exec_id=${EXEC_ID}&scope=workflow"

Response (Server-Sent Events):

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":1,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"CUSTOM_TASK_IN_PROGRESS",...},"broker_sequence":2,"workflow_context":{...}}

data: {"stream":"token","data":{"event_type":"WORKFLOW_EXECUTION_COMPLETED",...},"broker_sequence":3,"workflow_context":{...}}
Consuming events from workflow trees

Consuming events from workflow trees

When a workflow spawns child workflows, every event carries workflow_context with workflow_exec_id, parent_workflow_exec_id, and root_workflow_exec_id. The parent workflow uses execute_workflow to spawn a child:

@workflows.workflow.define(name="parent-workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params):
        # Child workflow events will have parent_workflow_exec_id set to this execution
        result = await workflows.workflow.execute_workflow(
            ChildWorkflow,
            params,
        )
        return result

To receive events from a parent workflow and all of its descendants in a single stream, subscribe using root_workflow_exec_id:

# Get events from parent AND all children
async for event in client.workflows.events.get_stream_events(
    root_workflow_exec_id=parent_execution_id,
):
    if event.workflow_context.parent_workflow_exec_id:
        print(f"[child] {event.data}")
    else:
        print(f"[parent] {event.data}")
Event types

Event types

The SDK uses typed event responses discriminated by event_type. Each event is a Pydantic model (e.g. WorkflowExecutionCompletedResponse, CustomTaskInProgressResponse) with fields like event_id, event_timestamp, workflow_exec_id, workflow_name, and type-specific attributes.

For the canonical list of workflow and activity event types, see Core Concepts > Events > Event types. The streaming API additionally surfaces CUSTOM_TASK_* events emitted from inside activities (started, in-progress, completed, failed, timed out, canceled). Use these to publish progress updates from long-running activities; see Streaming for how to publish them.

For convenience, here are the event types you'll encounter most often when streaming:

Event TypeDescription
WORKFLOW_EXECUTION_STARTEDWorkflow started
WORKFLOW_EXECUTION_COMPLETEDWorkflow finished successfully
WORKFLOW_EXECUTION_FAILEDWorkflow failed
WORKFLOW_EXECUTION_CANCELEDWorkflow was canceled
WORKFLOW_EXECUTION_CONTINUED_AS_NEWWorkflow continued as new
WORKFLOW_TASK_TIMED_OUTWorkflow task timed out
WORKFLOW_TASK_FAILEDWorkflow task failed
CUSTOM_TASK_STARTEDCustom task started
CUSTOM_TASK_IN_PROGRESSCustom task progress update
CUSTOM_TASK_COMPLETEDCustom task completed
CUSTOM_TASK_FAILEDCustom task failed
CUSTOM_TASK_TIMED_OUTCustom task timed out
CUSTOM_TASK_CANCELEDCustom task canceled
ACTIVITY_TASK_STARTEDActivity started
ACTIVITY_TASK_COMPLETEDActivity finished
ACTIVITY_TASK_RETRYINGActivity is retrying
ACTIVITY_TASK_FAILEDActivity failed
Example Event

Example Event

{
  "stream": "token",
  "broker_sequence": 1,
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "event_type": "WORKFLOW_EXECUTION_STARTED",
    "event_id": "evt_abc123",
    "event_timestamp": 1736938200000000000,
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "root_workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null,
    "workflow_run_id": "run_456",
    "attributes": {}
  },
  "workflow_context": {
    "workflow_name": "my-workflow",
    "workflow_exec_id": "abc123",
    "parent_workflow_exec_id": null
  }
}
Best practices

Best practices

Always Handle Disconnection

Always Handle Disconnection

Streaming connections can drop at any time. Use start_seq to resume from the last received event, and implement exponential backoff to avoid hammering the server. The is_terminal_status() check below is your own helper — check the event type against terminal statuses like WORKFLOW_EXECUTION_COMPLETED, WORKFLOW_EXECUTION_FAILED, or WORKFLOW_EXECUTION_CANCELED.

async def resilient_consume(client, exec_id):
    max_retries = 10
    last_seq = 0

    last_seq = 0
    for attempt in range(max_retries):
        try:
            async for event in client.workflows.events.get_stream_events(
                workflow_exec_id=exec_id,
                start_seq=last_seq,
            ):
                last_seq = event.broker_sequence + 1  # advance past this event
                yield event

                if is_terminal_status(event):
                    return
        except ConnectionError:
            await asyncio.sleep(min(2 ** attempt, 30))
Filter Early

Filter Early

Filter at the subscription level, not in your code:

# Good: Filter at source
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, stream="token"
):
    process(event)

# Bad: Filter in code (wastes bandwidth)
async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id
):
    if event.stream == "token":
        process(event)
Use Appropriate Scope

Use Appropriate Scope

If you only care about activity events:

async for event in client.workflows.events.get_stream_events(
    workflow_exec_id=exec_id, scope="activity"
):
    if event.data is not None:
        # Only activity events, no workflow-level events
        process(event.data)