Child workflows

A workflow can execute other workflows as child workflows, enabling hierarchical orchestration patterns.

When to use a child workflow vs an activity

When to use a child workflow vs an activity

Both let you compose work — pick based on whether you need a separate durable history:

Use a child workflow when...Use an activity when...
The unit of work is itself long-running, has its own retries, or needs its own event history you can replay independentlyThe unit of work is a single side effect (DB write, API call, LLM call) that should be retried as a unit
You want to inspect, signal, or query that sub-process from outsideYou don't need external observability beyond the parent's history
You want to run it on a different worker / deploymentThe parent worker can execute it directly

Child workflows are heavier than activities: they get their own execution row, history, and retry policy. Reach for an activity first; reach for a child workflow when you need its independent durability.

Basic usage

Basic usage

Execute a child workflow and wait for its result:

import asyncio
import mistralai.workflows as workflows
from pydantic import BaseModel


@workflows.activity()
async def process_item(value: str) -> str:
    return f"processed:{value}"


class ChildInput(BaseModel):
    value: str


@workflows.workflow.define(name="child_workflow")
class ChildWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: ChildInput) -> str:
        return await process_item(params.value)


class ParentInput(BaseModel):
    data: str


@workflows.workflow.define(name="parent_workflow")
class ParentWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: ParentInput) -> str:
        child_result = await workflows.execute_workflow(
            ChildWorkflow, params=ChildInput(value=params.data)
        )
        return f"Parent got: {child_result}"


async def main():
    result = await workflows.execute_workflow(
        ParentWorkflow,
        params=ParentInput(data="hello"),
    )
    print(result)
Fire and forget

Fire and forget

Start a child workflow without waiting for its result by passing wait=False:

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
)
# Parent continues immediately — child runs independently
# Optionally await later: result = await handle

By default, wait=False sets the parent close policy to ABANDON, so the child continues running even if the parent completes.

Parent close policy

Parent close policy

Override the default close policy with the parent_close_policy parameter:

from mistralai.workflows import ParentClosePolicy

handle = await workflow.execute_workflow(
    ChildWorkflow,
    params=child_params,
    execution_timeout=timedelta(hours=1),
    wait=False,
    parent_close_policy=ParentClosePolicy.TERMINATE,
)
Observing child workflow events

Observing child workflow events

Every child workflow gets its own execution ID and event history. Each event carries workflow_context.parent_workflow_exec_id and workflow_context.root_workflow_exec_id, so you can subscribe to a parent and all of its descendants in a single stream by filtering on root_workflow_exec_id.

For a top-level workflow with no parent, parent_workflow_exec_id is null and root_workflow_exec_id is the workflow's own execution ID.

See Consuming Streaming Events > Consuming events from workflow trees for the subscription pattern.