Workflow scheduling
Run workflows on a recurring cron schedule, with policies for catchup and overlap.
How it works
Workers register their workflow schedules with the platform at startup. The platform fires the workflow at the scheduled time and applies the policies you've defined for catchup and overlap.
Schedule changes require a worker restart to take effect. The worker reads schedule definitions once at startup — changes to cron expressions or policies in your code are not picked up by a running worker.
Defining Schedules
Add schedules to workflows using cron expressions. Schedules use UTC by default — convert from your local timezone before writing the expression.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"] # Daily at midnight UTC
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Generate report
passDefining Schedule Policies
Policies control what happens when schedules are missed or overlap:
catchup_window_seconds— If the platform was down or missed scheduled runs, it will retroactively trigger all missed executions within this window. Runs older than the window are skipped.overlap— What to do when a new run is due but the previous one is still running:SKIP— drop the new run. Use this when you only care about the latest data and overlapping runs would do duplicate work (e.g., periodic sync jobs).BUFFER_ONE— queue one pending run, drop further ones until the buffered run starts. Use this when you want to guarantee one follow-up run but don't want a backlog.ALLOW_ALL— start every scheduled run concurrently. Use this only when runs are independent and the workload can absorb the parallelism.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Override default schedule policy
schedule_policy = SchedulePolicy(
catchup_window_seconds=86400, # Allow 1 day of catchup
overlap=ScheduleOverlapPolicy.SKIP, # Skip overlapping executions
)
schedule = ScheduleDefinition(
input={"report_type": "daily"},
cron_expressions=["0 0 * * *"], # Daily at midnight UTC
policy=schedule_policy
)
@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
@workflows.workflow.entrypoint
async def run(self, report_type: str = "daily") -> None:
# Generate report
passManaging schedules
Schedules can be managed in two ways:
- In code (recommended for most cases): declare schedules on the workflow via
@workflows.workflow.define(..., schedules=[...]). Workers register them at startup. Changes to theScheduleDefinitiontake effect after restarting the worker. - Via the REST API:
POST /v1/workflows/schedulesto create,GET /v1/workflows/schedulesto list, andDELETE /v1/workflows/schedules/{schedule_id}to remove. Use the API when you want to manage schedules independently of worker deployments (e.g., one-off ad-hoc schedules, or schedules driven by an external control plane).
Workers must agree on schedule definitions: every worker for a given workflow registers its own copy of the in-code schedules. If two workers run different versions of the same workflow with different ScheduleDefinition configs, the platform sees conflicting registrations and behavior is undefined. Roll out worker updates so all workers converge on the same definition.
Complete Example
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy
# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
input={"retention_days": 30},
cron_expressions=["0 3 * * 6"],
policy=SchedulePolicy(
catchup_window_seconds=604800, # 7 days
overlap=ScheduleOverlapPolicy.SKIP,
)
)
@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
@workflows.workflow.entrypoint
async def run(self, retention_days: int = 30) -> None:
print(f"Starting backup with {retention_days} day retention")
# Backup implementation here
# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))Notes
- Cron expressions follow standard 5-field syntax (
minute hour day-of-month month day-of-week). - A workflow can have multiple
cron_expressions— they all fire independently. - Each schedule passes its own
inputpayload to the workflow's entrypoint. - Schedule changes require a worker restart to take effect.