Workflow scheduling

Run workflows on a recurring cron schedule, with policies for catchup and overlap.

How it works

How it works

Workers register their workflow schedules with the platform at startup. The platform fires the workflow at the scheduled time and applies the policies you've defined for catchup and overlap.

note

Schedule changes require a worker restart to take effect. The worker reads schedule definitions once at startup — changes to cron expressions or policies in your code are not picked up by a running worker.

Defining Schedules

Defining Schedules

Add schedules to workflows using cron expressions. Schedules use UTC by default — convert from your local timezone before writing the expression.

import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"]  # Daily at midnight UTC
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Generate report
        pass
Defining Schedule Policies

Defining Schedule Policies

Policies control what happens when schedules are missed or overlap:

  • catchup_window_seconds — If the platform was down or missed scheduled runs, it will retroactively trigger all missed executions within this window. Runs older than the window are skipped.
  • overlap — What to do when a new run is due but the previous one is still running:
    • SKIP — drop the new run. Use this when you only care about the latest data and overlapping runs would do duplicate work (e.g., periodic sync jobs).
    • BUFFER_ONE — queue one pending run, drop further ones until the buffered run starts. Use this when you want to guarantee one follow-up run but don't want a backlog.
    • ALLOW_ALL — start every scheduled run concurrently. Use this only when runs are independent and the workload can absorb the parallelism.
import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Override default schedule policy
schedule_policy = SchedulePolicy(
    catchup_window_seconds=86400,  # Allow 1 day of catchup
    overlap=ScheduleOverlapPolicy.SKIP,  # Skip overlapping executions
)

schedule = ScheduleDefinition(
    input={"report_type": "daily"},
    cron_expressions=["0 0 * * *"],  # Daily at midnight UTC
    policy=schedule_policy
)

@workflows.workflow.define(name="report_workflow", schedules=[schedule])
class ReportWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, report_type: str = "daily") -> None:
        # Generate report
        pass
Managing schedules

Managing schedules

Schedules can be managed in two ways:

  • In code (recommended for most cases): declare schedules on the workflow via @workflows.workflow.define(..., schedules=[...]). Workers register them at startup. Changes to the ScheduleDefinition take effect after restarting the worker.
  • Via the REST API: POST /v1/workflows/schedules to create, GET /v1/workflows/schedules to list, and DELETE /v1/workflows/schedules/{schedule_id} to remove. Use the API when you want to manage schedules independently of worker deployments (e.g., one-off ad-hoc schedules, or schedules driven by an external control plane).
warning

Workers must agree on schedule definitions: every worker for a given workflow registers its own copy of the in-code schedules. If two workers run different versions of the same workflow with different ScheduleDefinition configs, the platform sees conflicting registrations and behavior is undefined. Roll out worker updates so all workers converge on the same definition.

Complete Example

Complete Example

import mistralai.workflows as workflows
from mistralai.workflows.models import ScheduleDefinition, SchedulePolicy, ScheduleOverlapPolicy

# Run every Saturday at 3 AM UTC
backup_schedule = ScheduleDefinition(
    input={"retention_days": 30},
    cron_expressions=["0 3 * * 6"],
    policy=SchedulePolicy(
        catchup_window_seconds=604800,  # 7 days
        overlap=ScheduleOverlapPolicy.SKIP,
    )
)

@workflows.workflow.define(name="database_backup_workflow", schedules=[backup_schedule])
class DatabaseBackupWorkflow:
    @workflows.workflow.entrypoint
    async def run(self, retention_days: int = 30) -> None:
        print(f"Starting backup with {retention_days} day retention")
        # Backup implementation here

# Start worker with:
# asyncio.run(workflows.run_worker([DatabaseBackupWorkflow]))
Notes

Notes

  • Cron expressions follow standard 5-field syntax (minute hour day-of-month month day-of-week).
  • A workflow can have multiple cron_expressions — they all fire independently.
  • Each schedule passes its own input payload to the workflow's entrypoint.
  • Schedule changes require a worker restart to take effect.