Configuration Reference
Rhythm is configured primarily through a combination of environment variables, an optional configuration file, and programmatic options passed during initialization. The core engine is designed to be embedded, meaning configuration often happens at the boundary between your host application (e.g., Python) and the Rust core.
Environment Variables
The most common way to configure Rhythm in containerized or cloud environments is through environment variables.
| Variable | Type | Description |
| :--- | :--- | :--- |
| DATABASE_URL | String | Required. The PostgreSQL connection string. Format: postgres://user:password@host:port/dbname. |
| RHYTHM_CONFIG_PATH | String | Path to a YAML or TOML configuration file. |
| RHYTHM_LOG | String | Log level filtering (e.g., info, debug, rhythm=debug). Follows tracing-subscriber directives. |
| RHYTHM_AUTO_MIGRATE | Boolean | If true, Rhythm will run database migrations on startup. Defaults to true. |
Programmatic Configuration
When initializing the Rhythm Application instance, you use the InitOptions struct or the InitBuilder. This is the preferred method for configuring workflows and overriding environment-based defaults.
InitOptions Struct
The InitOptions struct defines the startup behavior of the engine.
| Field | Type | Description |
| :--- | :--- | :--- |
| database_url | Option<String> | Overrides the DATABASE_URL environment variable. |
| config_path | Option<String> | Overrides the default configuration file search path. |
| auto_migrate | bool | Whether to automatically apply SQL migrations. |
| workflows | Vec<WorkflowFile> | A list of workflow scripts to register in the database. |
InitBuilder Usage
The InitBuilder provides a fluent interface for constructing the application context.
use rhythm::{InitBuilder, WorkflowFile};
let app = InitBuilder::new()
.database_url("postgres://localhost/my_db")
.auto_migrate(true)
.workflows(vec![
WorkflowFile {
name: "onboard_user".into(),
source: std::fs::read_to_string("workflows/onboard.flow")?,
file_path: "workflows/onboard.flow".into(),
}
])
.init()
.await?;
Workflow Registration
Workflows in Rhythm are self-versioning and immutable. When you register a WorkflowFile during initialization, the engine performs the following:
- Validation: Parses the
.flowsource to ensure syntax validity. - Hashing: Generates a content hash of the source code.
- Idempotent Registration: Checks the database for an existing workflow with the same name and hash. If it doesn't exist, a new version is created.
This ensures that running workflows are never interrupted by code changes; they continue executing against the specific version hash they started with.
Database Configuration
Rhythm requires PostgreSQL. It utilizes sqlx for connection pooling and transaction management.
Connection Pool Settings
While currently utilizing defaults, the underlying PgPool can be configured via the DATABASE_URL query parameters:
max_connections: The maximum number of connections in the pool (default: 10).min_connections: The minimum number of connections to maintain.connect_timeout: Maximum time to wait for a connection (in seconds).
Example:
DATABASE_URL=postgres://user:pass@localhost/db?max_connections=20&connect_timeout=5
Execution Engine Settings
The core engine behavior is governed by the Application services.
Internal Worker
The InternalWorker handles background maintenance, specifically the Scheduler Queue. This process claims items from the scheduled_queue table and moves them to the work_queue when they are ready to run.
To start the internal scheduler processor:
app.start_internal_worker()?;
Queue Management
Rhythm uses a table-based queue system. Workflows and Tasks are assigned to a queue string. You can configure your workers to poll specific queues to isolate workloads or manage priorities.
- Default Queue:
default - Priority: Higher integers represent higher priority (e.g.,
10runs before0).
Types Reference
ExecutionStatus
Defined in core/src/types.rs, these statuses represent the lifecycle of a durable execution:
Pending: Enqueued and waiting for a worker.Running: Currently being processed by a worker.Suspended: Workflow is paused (e.g., awaiting aTaskorSignal).Completed: Finished successfully; output is persisted.Failed: Terminated with an error; error details are persisted.
ExecutionType
Task: A stateless unit of work executed in the host language (Python, etc.).Workflow: A stateful orchestration script written in.flow.