Child workflows
A workflow can execute other workflows as child workflows, enabling hierarchical orchestration patterns.
When to use a child workflow vs an activity
Both let you compose work — pick based on whether you need a separate durable history:
| Use a child workflow when... | Use an activity when... |
|---|---|
| The unit of work is itself long-running, has its own retries, or needs its own event history you can replay independently | The unit of work is a single side effect (DB write, API call, LLM call) that should be retried as a unit |
| You want to inspect, signal, or query that sub-process from outside | You don't need external observability beyond the parent's history |
| You want to run it on a different worker / deployment | The parent worker can execute it directly |
Child workflows are heavier than activities: they get their own execution row, history, and retry policy. Reach for an activity first; reach for a child workflow when you need its independent durability.
Basic usage
Execute a child workflow and wait for its result:
import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel
@workflows.activity()
async def process_item(value: str) -> str:
return f"processed:{value}"
class ChildInput(BaseModel):
value: str
@workflows.workflow.define(name="child_workflow")
class ChildWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ChildInput) -> str:
return await process_item(params.value)
class ParentInput(BaseModel):
data: str
@workflows.workflow.define(name="parent_workflow")
class ParentWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: ParentInput) -> str:
child_result = await workflows.execute_workflow(
ChildWorkflow, params=ChildInput(value=params.data)
)
return f"Parent got: {child_result}"
async def main():
result = await workflows.execute_workflow(
ParentWorkflow,
params=ParentInput(data="hello"),
)
print(result)Fire and forget
Start a child workflow without waiting for its result by passing wait=False:
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handleBy default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes.
Parent close policy
Override the default close policy with the parent_close_policy parameter:
from mistralai.workflows import ParentClosePolicy
handle = await workflow.execute_workflow(
ChildWorkflow,
params=child_params,
execution_timeout=timedelta(hours=1),
wait=False,
parent_close_policy=ParentClosePolicy.TERMINATE,
)Observing child workflow events
Every child workflow gets its own execution ID and event history. Each event carries workflow_context.parent_workflow_exec_id and workflow_context.root_workflow_exec_id, so you can subscribe to a parent and all of its descendants in a single stream by filtering on root_workflow_exec_id.
For a top-level workflow with no parent, parent_workflow_exec_id is null and root_workflow_exec_id is the workflow's own execution ID.
See Consuming Streaming Events > Consuming events from workflow trees for the subscription pattern.