Database & Migrations
Rhythm is designed to have minimal infrastructure overhead, requiring only a PostgreSQL database to function. The database serves as the source of truth for workflow state, task queuing, and script versioning.
Overview
Rhythm uses Postgres not just as a data store, but as a robust task broker and state coordinator. By leveraging Postgres's transactional guarantees, Rhythm ensures that workflow transitions and task enqueues are atomic.
- Durable State: Workflow execution state is persisted as it hits
awaitpoints. - Content-Addressable Workflows: Workflow scripts are stored in the database and versioned by their content hash.
- Reliable Queuing: Rhythm uses a
work_queuetable for immediate tasks and ascheduled_queuefor delayed execution (e.g., retries or timeouts).
Configuration
To connect Rhythm to your database, you must provide a standard Postgres connection string. This can be done via environment variables or through the initialization API.
# Example Connection String
DATABASE_URL=postgres://user:password@localhost:5432/rhythm_db
Automatic Migrations
Rhythm manages its own schema through an internal migration system. By default, Rhythm will check the database at startup and apply any missing migrations automatically.
In a Python environment, this is typically handled during the rhythm.init() call:
import rhythm
# Default behavior: auto_migrate is True
rhythm.init(
database_url="postgres://localhost/my_app",
auto_migrate=True
)
In the Rust core, this is controlled via the InitBuilder:
let app = InitBuilder::new()
.database_url("postgres://localhost/my_app")
.auto_migrate(true)
.init()
.await?;
Manual Migrations
If your production environment forbids applications from performing schema changes at runtime, you can disable auto_migrate and run the migrations as part of your CI/CD pipeline.
Core Schema
While Rhythm abstracts the database interactions, understanding the primary tables can be helpful for monitoring and debugging.
executions
The central table for all units of work. Every task and workflow instance is an execution.
- Fields:
id,type(task/workflow),status,inputs,output,parent_workflow_id. - Statuses:
pending,running,suspended,completed,failed.
workflow_definitions
Stores the source code for your .flow files.
- Versioning: Workflows are immutable. If you change a script, Rhythm generates a new version based on the content hash.
- Consistency: In-progress workflows are pinned to the specific version hash they started with, ensuring logic doesn't change mid-execution.
work_queue
A high-concurrency queue used to distribute tasks to workers.
- Rhythm uses
SELECT ... FOR UPDATE SKIP LOCKEDto allow multiple workers to process the queue safely without contention.
scheduled_queue
Stores items that are intended to run at a future time.
- Used for workflow
sleepcalls, task retries with backoff, and signal timeouts. - An internal background process (the
InternalWorker) monitors this table and moves items to thework_queuewhen they are due.
signals
Stores external events sent to workflows. When a workflow is suspended waiting for a signal, sending a record to this table will trigger the workflow to resume.
Data Types
Rhythm maps internal types to Postgres native types where possible:
- JSONB: Used for
inputsandoutputs, allowing for complex, structured data. - TIMESTAMPTZ: Used for all temporal tracking (
created_at,run_at, etc.) to ensure timezone consistency. - TEXT: Used for status enums and identifiers.