Dependency Injection

Dependency injection provides a clean way to pass shared resources (database connections, API clients, configuration) into activities without constructing them manually inside each call.

Quick start

Quick start

Mark an activity parameter with Depends(provider) and the worker will pass the provider's result on every call:

import mistralai.workflows as workflows
from mistralai.workflows import Depends

async def get_db() -> Database:
    return await Database.connect("postgres://...")

@workflows.activity()
async def insert_user(name: str, db: Database = Depends(get_db)) -> int:
    return await db.insert(name)

The provider (get_db) is called once when the worker starts. Its result is reused across every activity call.

tip

Provider lifetime: dependencies declared with Depends() are initialized once at worker startup and shared across all activity executions. The same instance is reused for every call, so connection pools, API clients, and parsed config objects are created once. Make sure the value returned is safe to share across concurrent activities.

How it works

How it works

The system supports multiple provider styles. Pick the one that matches your resource's lifecycle:

  1. Synchronous functions: simple configuration values or in-memory objects
  2. Asynchronous functions: I/O-bound dependencies like database connections
  3. Context managers (sync or async): resources that require cleanup
  4. Generators (sync or async): streaming data or complex resource lifecycles

When you declare a dependency in your activity, the system invokes the provider, handles the lifecycle, and passes the value as the parameter default.

Defining Dependencies

Defining Dependencies

Synchronous Function Provider

Synchronous Function Provider

For simple configuration values or objects that don't require async initialization:

def get_config() -> dict:
    """Provides application configuration"""
    return {
        "timeout": 30,
        "retries": 3,
        "api_url": "https://api.example.com"
    }
Asynchronous Function Provider

Asynchronous Function Provider

For dependencies that require async initialization, like database connections:

async def get_db_connection() -> DatabaseConnection:
    """Creates and returns a database connection"""
    conn = await DatabaseConnection.create("postgres://user:pass@localhost/db")
    return conn
Context Manager Provider

Context Manager Provider

For resources that need proper cleanup, like sessions that require logout:

from contextlib import contextmanager

@contextmanager
def get_logged_in_session():
    """Provides a session with automatic login/logout"""
    session = Session()
    session.login()
    try:
        yield session
    finally:
        session.logout()
Async Context Manager Provider

Async Context Manager Provider

For async resources that need cleanup, like database connections:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection_with_cleanup():
    """Provides a database connection with proper cleanup"""
    conn = await DatabaseConnection.create("postgres://...")
    try:
        yield conn
    finally:
        await conn.close()
Generator Provider

Generator Provider

For streaming data or complex resource management:

def get_data_stream():
    """Provides a stream of data"""
    with open("data.txt") as f:
        for line in f:
            yield line.strip()
Async Generator Provider

Async Generator Provider

For async streaming data sources:

import aiofiles

async def get_async_data_stream():
    """Provides an async stream of data"""
    async with aiofiles.open("data.txt") as f:
        async for line in f:
            yield line.strip()
Using Dependencies in Activities

Using Dependencies in Activities

Activities can declare their dependencies using the Depends() marker. The system will automatically provide these dependencies when the activity executes:

import asyncio
import mistralai.workflows as workflows
from mistralai.workflows import Depends
from contextlib import contextmanager
from pydantic import BaseModel


def get_config() -> dict:
    return {"timeout": 30, "retries": 3, "api_url": "https://api.example.com"}


class FakeDB:
    def __init__(self):
        self.records = []

    def insert(self, name: str):
        self.records.append(name)
        return len(self.records)


async def get_db() -> FakeDB:
    return FakeDB()


@contextmanager
def get_session():
    session = {"logged_in": True, "events": []}
    try:
        yield session
    finally:
        session["logged_in"] = False


@workflows.activity()
async def create_user(
    name: str,
    db: FakeDB = Depends(get_db),
    config: dict = Depends(get_config),
    session: dict = Depends(get_session),
) -> dict:
    record_id = db.insert(name)
    session["events"].append("user_created")
    return {
        "status": "success",
        "record_id": record_id,
        "timeout_used": config["timeout"],
    }


class Input(BaseModel):
    name: str


@workflows.workflow.define(name="di_workflow")
class DIWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, params: Input) -> dict:
        return await create_user(params.name)


async def main():
    result = await workflows.execute_workflow(
        DIWorkflow,
        params=Input(name="Alice"),
    )
    print(result)
Common use cases

Common use cases

The same Depends(provider) pattern works for any shared resource. Two recurring examples:

Database connections — provide an async session factory with cleanup:

async def get_db_session() -> AsyncIterator[AsyncSession]:
    engine = await create_db_engine("postgres://...")
    SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False)
    async with SessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

@workflows.activity()
async def write_record(payload: dict, session: AsyncSession = Depends(get_db_session)) -> int:
    record = Record(**payload)
    session.add(record)
    await session.commit()
    return record.id

API clients — initialize once, reuse the persistent connection:

import os
import mistralai.workflows as workflows
from mistralai.workflows import Depends

async def get_payment_client() -> PaymentServiceClient:
    client = PaymentServiceClient(api_key=os.environ["PAYMENT_API_KEY"])
    await client.initialize()
    return client

@workflows.activity()
async def charge(user_id: str, amount: int, client: PaymentServiceClient = Depends(get_payment_client)) -> str:
    return await client.charge(user_id=user_id, amount_cents=amount)