Streaming Events
Stream events in real-time from your workflows and activities to power live UIs, progress indicators, and token-by-token LLM responses.
This page covers how to publish streaming events from workflow code. To consume them from a client, see Consuming Streaming Events.
Quick start
1. Publish from an Activity
Inside an activity, use the Task context manager to publish events. A Task represents a named stream of state updates that external consumers can subscribe to. The type parameter is a topic name you choose (e.g., "token-stream"), and state is the initial payload. Each call to update_state() publishes the new state as an event to all subscribers in real time.
import mistralai.workflows as workflows
from mistralai.workflows.core.task import Task
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}Terminal event on exit: when the async with block exits, the task is automatically marked complete and a terminal event is published to all subscribers. There is no need to call a close() or complete() method explicitly.
2. Consume Events
See Consuming Streaming Events for details on subscribing to events.
Publishing Patterns
Token Streaming (LLM)
The most common pattern: stream tokens as they're generated:
@workflows.activity()
async def chat_activity(messages: list) -> dict:
initial_state = {"tokens": []}
async with Task(type="token-stream", state=initial_state) as task:
async for chunk in llm.stream(messages):
token = chunk.choices[0].delta.content
if token:
await task.update_state({"tokens": task.state["tokens"] + [token]})
return {"response": "".join(task.state["tokens"])}Progress Updates
Report progress during long operations:
@workflows.activity(name="Processing text with explicit control")
async def streaming_tokens_with_progress_activity(text: str) -> dict:
"""
Example of explicit progress tracking with Task API.
This pattern gives you full control over state updates and progress tracking.
"""
words = text.split()
initial_state = {"processed_words": [], "progress_idx": 0, "progress_total": len(words)}
async with Task(type="progress-stream", state=initial_state) as task:
state = task.state
for i, word in enumerate(words):
await task.update_state({
"processed_words": state["processed_words"] + [word],
"progress_idx": i + 1,
})
state = task.state
await asyncio.sleep(0.1)
final_state = task.state
return {"processed_text": " ".join(final_state["processed_words"]), "token_count": len(words)}Task Type Names
The type parameter is a topic name you define. Pick names that describe the content of the stream so consumers can subscribe by intent.
| Recommended | Avoid | Why |
|---|---|---|
token | data | data says nothing about what's on the stream |
progress | Stream1 | Numbered names break when streams are added or reordered |
search_result | myStream | Mixed case is inconsistent with the rest of the SDK |
- Use lowercase with underscores
- Keep names short and descriptive
- Use consistent naming across your app
Payload Limits
Streaming event messages have a 1MB cap, separate from the 2MB activity input/output limit. For larger payloads:
# Bad: Don't stream large payloads
# await task.update_state(large_document)
# Good: Store large data externally, stream a reference
url = await storage.upload(large_document)
await task.update_state({"url": url, "size": len(large_document)})Event Schema
Each published event is wrapped with context:
{
"stream": "token", # Your stream/topic name
"broker_sequence": 42, # Monotonic sequence number
"timestamp": "2026-04-15T10:30:00.000Z", # ISO timestamp
"data": { # Typed event response (discriminated union)
"event_type": "CUSTOM_TASK_IN_PROGRESS",
"event_id": "evt_abc123",
"event_timestamp": "2026-04-15T10:30:00.000Z",
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"root_workflow_exec_id": "abc123",
"parent_workflow_exec_id": null,
"workflow_run_id": "run_456",
"attributes": {"progress_idx": 4, "progress_total": 10}
},
"workflow_context": {
"workflow_name": "my-workflow",
"workflow_exec_id": "abc123",
"parent_workflow_exec_id": null
}
}Sequence Guarantee
broker_sequence is a hard guarantee you can build on:
- Ordered: events are always delivered in ascending sequence order — you will never receive a lower sequence after a higher one.
- Unique: a sequence number is never reused, even after events expire.
- Resume-safe: reconnecting with
start_seq=Npicks up exactly from sequence N. No event is skipped, no event is re-delivered from before N. - Durable: the guarantee holds across service restarts and reconnects. If the stream was interrupted mid-execution, historical events are replayed in the correct order before live events resume.
Use event.broker_sequence + 1 as the start_seq on reconnect — see Consuming Streaming Events for the full resume pattern.
What's next
→ Consume events from a client: subscribe to the stream you just published, with the SDK or the raw HTTP endpoint.