Streaming Events

Stream events in real-time from your workflows and activities to power live UIs, progress indicators, and token-by-token LLM responses.

This page covers how to publish streaming events from workflow code. To consume them from a client, see Consuming Streaming Events.

Quick start

Quick start

1. Publish from an Activity

1. Publish from an Activity

Inside an activity, use the Task context manager to publish events. A Task represents a named stream of state updates that external consumers can subscribe to. The type parameter is a topic name you choose (e.g., "token-stream"), and state is the initial payload. Each call to update_state() publishes the new state as an event to all subscribers in real time.

import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}
tip

Terminal event on exit: when the async with block exits, the task is automatically marked complete and a terminal event is published to all subscribers. There is no need to call a close() or complete() method explicitly.

2. Consume Events

2. Consume Events

See Consuming Streaming Events for details on subscribing to events.

Publishing Patterns

Publishing Patterns

Token Streaming (LLM)

Token Streaming (LLM)

The most common pattern: stream tokens as they're generated:

@workflows.activity()
async def chat_activity(messages: list) -> dict:
    initial_state = {"tokens": []}

    async with Task(type="token-stream", state=initial_state) as task:

        async for chunk in llm.stream(messages):
            token = chunk.choices[0].delta.content
            if token:
                await task.update_state({"tokens": task.state["tokens"] + [token]})

    return {"response": "".join(task.state["tokens"])}
Progress Updates

Progress Updates

Report progress during long operations:

@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
    """
    Example of explicit progress tracking with Task API.

    This pattern gives you full control over state updates and progress tracking.
    """
    words = text.split()
    initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}

    async with Task(type="progress-stream", state=initial_state) as task:
        state = task.state
        for i, word in enumerate(words):
            await task.update_state({
                "processed_words": state["processed_words"] + [word],
                "progress_idx": i + 1,
            })
            state = task.state
            await asyncio.sleep(0.1)

    final_state = task.state
    return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}
Task Type Names

Task Type Names

The type parameter is a topic name you define. Pick names that describe the content of the stream so consumers can subscribe by intent.

RecommendedAvoidWhy
tokendatadata says nothing about what's on the stream
progressStream1Numbered names break when streams are added or reordered
search_resultmyStreamMixed case is inconsistent with the rest of the SDK
  • Use lowercase with underscores
  • Keep names short and descriptive
  • Use consistent naming across your app
Payload Limits

Payload Limits

Streaming event messages have a 1MB cap, separate from the 2MB activity input/output limit. For larger payloads:

# Bad: Don't stream large payloads
# await task.update_state(large_document)

# Good: Store large data externally, stream a reference
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})
Event Schema

Event Schema

Each published event is wrapped with context:

{
    "stream": "token",                          # Your stream/topic name
    "broker_sequence": 42,                      # Monotonic sequence number
    "timestamp": "2026-04-15T10:30:00.000Z",    # ISO timestamp
    "data": {                                   # Typed event response (discriminated union)
        "event_type": "CUSTOM_TASK_IN_PROGRESS",
        "event_id": "evt_abc123",
        "event_timestamp": "2026-04-15T10:30:00.000Z",
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "root_workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null,
        "workflow_run_id": "run_456",
        "attributes": {"progress_idx": 4, "progress_total": 10}
    },
    "workflow_context": {
        "workflow_name": "my-workflow",
        "workflow_exec_id": "abc123",
        "parent_workflow_exec_id": null
    }
}
Sequence Guarantee

Sequence Guarantee

broker_sequence is a hard guarantee you can build on:

  • Ordered: events are always delivered in ascending sequence order — you will never receive a lower sequence after a higher one.
  • Unique: a sequence number is never reused, even after events expire.
  • Resume-safe: reconnecting with start_seq=N picks up exactly from sequence N. No event is skipped, no event is re-delivered from before N.
  • Durable: the guarantee holds across service restarts and reconnects. If the stream was interrupted mid-execution, historical events are replayed in the correct order before live events resume.

Use event.broker_sequence + 1 as the start_seq on reconnect — see Consuming Streaming Events for the full resume pattern.

What's next

What's next

Consume events from a client: subscribe to the stream you just published, with the SDK or the raw HTTP endpoint.