Scheduling & Timers
Rhythm provides first-class support for time-based operations, allowing you to schedule tasks for future execution or pause workflows for specific durations. Because Rhythm is a durable execution engine, these timers persist across process restarts and system failures.
Scheduling Executions
You can schedule both Tasks and Workflows to start at a specific time in the future. When an execution is scheduled, it is persisted to the database in a Pending state and moved to the active work queue only when the specified time is reached.
The ScheduleExecutionParams Interface
When scheduling via the client, you use the ScheduleExecutionParams structure:
| Parameter | Type | Description |
| :--- | :--- | :--- |
| exec_type | ExecutionType | Either Task or Workflow. |
| target_name | String | The name of the registered task or .flow file. |
| queue | String | The queue name for the worker to pick up the execution. |
| inputs | JsonValue | Data to pass into the execution. |
| run_at | DateTime | The timestamp when the execution should become active. |
Example: Scheduling a Task (Python)
from datetime import datetime, timedelta
from rhythm import Client, ExecutionType
client = Client()
# Schedule a "send-reminder" task to run in 1 hour
run_at = datetime.utcnow() + timedelta(hours=1)
client.schedule(
exec_type=ExecutionType.TASK,
target_name="send-reminder",
queue="default",
inputs={"userId": "user_123"},
run_at=run_at
)
Timers in Workflows
In Rhythm workflows (.flow files), you can use time-based logic to control the flow of execution. These timers are durable; the workflow will suspend, its state will be saved to the database, and the scheduler will wake it up when the timer expires.
Signal Timeouts
The primary way to handle delays and expirations within a workflow is through Signal.when. This allows a workflow to wait for an external event while providing a fallback if the event does not occur within a specified timeframe.
// onboard_user.flow
try {
// Wait for the 'manager-approval' signal, but timeout after 24 hours
let approval = await Signal.when("manager-approval", { timeout: "24h" })
await Task.run("provision-account", { approvedBy: approval.userId })
} catch (err) {
if (err.type === "TimeoutError") {
// Handle expiration logic
await Task.run("notify-stale-application", { id: Inputs.id })
}
}
Duration Formats
Timeouts and delays accept human-readable duration strings:
30s(30 seconds)15m(15 minutes)2h(2 hours)7d(7 days)
The Internal Worker
For scheduled items to be processed, at least one instance of the Rhythm Internal Worker must be running. While the standard worker handles the execution of tasks and workflows, the Internal Worker is responsible for:
- Monitoring the
scheduled_queuein Postgres. - Identifying items where
run_athas passed. - Moving those items into the active
work_queuefor standard workers to pick up.
Starting the Scheduler
In your host application, ensure you call start_internal_worker() on the Rhythm application instance. This is typically done in the same process that hosts your task workers.
import rhythm
app = rhythm.init(...)
# Start the background thread that processes the scheduled queue
app.start_internal_worker()
# Start the standard worker to process tasks
app.worker().run()
Failure and Retry Behavior
- Persistence: Scheduled executions are stored in the database. If the scheduler or worker crashes, no timers are lost.
- Precision: Rhythm's scheduler is designed for workflow-level precision (seconds to minutes). It is not intended for high-frequency real-time scheduling (milliseconds).
- Missed Windows: If the Internal Worker is offline when a scheduled execution is set to run, it will be processed immediately once the Internal Worker resumes.