Dependency Injection
Dependency injection provides a clean way to pass shared resources (database connections, API clients, configuration) into activities without constructing them manually inside each call.
Quick start
Mark an activity parameter with Depends(provider) and the worker will pass the provider's result on every call:
import mistralai.workflows as workflows
from mistralai.workflows import Depends
async def get_db() -> Database:
return await Database.connect("postgres://...")
@workflows.activity()
async def insert_user(name: str, db: Database = Depends(get_db)) -> int:
return await db.insert(name)The provider (get_db) is called once when the worker starts. Its result is reused across every activity call.
Provider lifetime: dependencies declared with Depends() are initialized once at worker startup and shared across all activity executions. The same instance is reused for every call, so connection pools, API clients, and parsed config objects are created once. Make sure the value returned is safe to share across concurrent activities.
How it works
The system supports multiple provider styles. Pick the one that matches your resource's lifecycle:
- Synchronous functions: simple configuration values or in-memory objects
- Asynchronous functions: I/O-bound dependencies like database connections
- Context managers (sync or async): resources that require cleanup
- Generators (sync or async): streaming data or complex resource lifecycles
When you declare a dependency in your activity, the system invokes the provider, handles the lifecycle, and passes the value as the parameter default.
Defining Dependencies
Synchronous Function Provider
For simple configuration values or objects that don't require async initialization:
def get_config() -> dict:
"""Provides application configuration"""
return {
"timeout": 30,
"retries": 3,
"api_url": "https://api.example.com"
}Asynchronous Function Provider
For dependencies that require async initialization, like database connections:
async def get_db_connection() -> DatabaseConnection:
"""Creates and returns a database connection"""
conn = await DatabaseConnection.create("postgres://user:pass@localhost/db")
return connContext Manager Provider
For resources that need proper cleanup, like sessions that require logout:
from contextlib import contextmanager
@contextmanager
def get_logged_in_session():
"""Provides a session with automatic login/logout"""
session = Session()
session.login()
try:
yield session
finally:
session.logout()Async Context Manager Provider
For async resources that need cleanup, like database connections:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db_connection_with_cleanup():
"""Provides a database connection with proper cleanup"""
conn = await DatabaseConnection.create("postgres://...")
try:
yield conn
finally:
await conn.close()Generator Provider
For streaming data or complex resource management:
def get_data_stream():
"""Provides a stream of data"""
with open("data.txt") as f:
for line in f:
yield line.strip()Async Generator Provider
For async streaming data sources:
import aiofiles
async def get_async_data_stream():
"""Provides an async stream of data"""
async with aiofiles.open("data.txt") as f:
async for line in f:
yield line.strip()Using Dependencies in Activities
Activities can declare their dependencies using the Depends() marker. The system will automatically provide these dependencies when the activity executes:
import asyncio
import mistralai.workflows as workflows
from mistralai.workflows import Depends
from contextlib import contextmanager
from pydantic import BaseModel
def get_config() -> dict:
return {"timeout": 30, "retries": 3, "api_url": "https://api.example.com"}
class FakeDB:
def __init__(self):
self.records = []
def insert(self, name: str):
self.records.append(name)
return len(self.records)
async def get_db() -> FakeDB:
return FakeDB()
@contextmanager
def get_session():
session = {"logged_in": True, "events": []}
try:
yield session
finally:
session["logged_in"] = False
@workflows.activity()
async def create_user(
name: str,
db: FakeDB = Depends(get_db),
config: dict = Depends(get_config),
session: dict = Depends(get_session),
) -> dict:
record_id = db.insert(name)
session["events"].append("user_created")
return {
"status": "success",
"record_id": record_id,
"timeout_used": config["timeout"],
}
class Input(BaseModel):
name: str
@workflows.workflow.define(name="di_workflow")
class DIWorkflow:
@workflows.workflow.entrypoint
async def run(self, params: Input) -> dict:
return await create_user(params.name)
async def main():
result = await workflows.execute_workflow(
DIWorkflow,
params=Input(name="Alice"),
)
print(result)Common use cases
The same Depends(provider) pattern works for any shared resource. Two recurring examples:
Database connections — provide an async session factory with cleanup:
async def get_db_session() -> AsyncIterator[AsyncSession]:
engine = await create_db_engine("postgres://...")
SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False)
async with SessionLocal() as session:
try:
yield session
finally:
await session.close()
@workflows.activity()
async def write_record(payload: dict, session: AsyncSession = Depends(get_db_session)) -> int:
record = Record(**payload)
session.add(record)
await session.commit()
return record.idAPI clients — initialize once, reuse the persistent connection:
import os
import mistralai.workflows as workflows
from mistralai.workflows import Depends
async def get_payment_client() -> PaymentServiceClient:
client = PaymentServiceClient(api_key=os.environ["PAYMENT_API_KEY"])
await client.initialize()
return client
@workflows.activity()
async def charge(user_id: str, amount: int, client: PaymentServiceClient = Depends(get_payment_client)) -> str:
return await client.charge(user_id=user_id, amount_cents=amount)