SDK Overview
The Rhythm SDK provides the interface for interacting with the Rhythm Durable Execution engine. While the core engine is written in Rust for performance and state management, the Python SDK allows you to define task logic, manage workflow lifecycles, and host workers using familiar Pythonic patterns.
Installation
Rhythm requires a Postgres database (13+) for state persistence. Install the Python SDK using pip:
pip install rhythm
Initialization
Before using Rhythm, you must initialize the Application instance. This setup handles database connections (via a Postgres pool) and registers your workflow definitions.
import rhythm
# Initialize with basic configuration
app = rhythm.init(
database_url="postgresql://user:password@localhost:5432/rhythm_db",
auto_migrate=True,
workflows=[
rhythm.WorkflowFile(
name="onboard_user",
file_path="./workflows/onboard_user.flow"
)
]
)
Initialization Options
| Parameter | Type | Description |
| :--- | :--- | :--- |
| database_url | str | The Postgres connection string. |
| auto_migrate | bool | If True, Rhythm automatically runs database migrations on startup. |
| workflows | List[WorkflowFile] | A list of .flow files to register and version in the database. |
The Client API
The Client is your primary entry point for managing executions. It provides services to start workflows, query status, and send signals.
Starting an Execution
You can trigger both workflows (written in .flow) and standalone tasks (written in Python) via the client.
from rhythm import client
# Start a workflow
execution_id = client.workflows.run(
"onboard_user",
inputs={"user_id": 123},
queue="default"
)
# Start a standalone task
task_id = client.tasks.run(
"send_email",
inputs={"to": "user@example.com", "body": "Welcome!"}
)
Sending Signals
Signals allow you to interact with running workflows by sending external data that the workflow can await.
client.signals.send(
workflow_id="exec_12345",
signal_name="manager-approval",
payload={"approved": True, "approver": "admin"}
)
The Worker
A Worker is a process that listens to a specific queue and executes Python tasks. While workflows are executed by the internal Rust VM, the Python Worker is responsible for running the actual side effects (Tasks) defined in your application code.
Registering Tasks
Tasks are defined using the @task decorator.
from rhythm import task
@task(name="submit-report")
def submit_report(inputs: dict):
report_id = inputs.get("reportId")
# ... logic to submit report ...
return {"status": "success", "id": report_id}
Running the Worker
The worker polls the database for pending work and executes the registered functions.
from rhythm import Worker
def main():
# Initialize app first
rhythm.init(...)
# Start worker on the "default" queue
worker = Worker(queue="default")
worker.run()
if __name__ == "__main__":
main()
Core Models
The SDK uses several key types to represent the state of the system:
Execution
Represents a single instance of a workflow or task.
| Field | Type | Description |
| :--- | :--- | :--- |
| id | str | Unique identifier for the execution. |
| status | ExecutionStatus | One of: pending, running, suspended, completed, failed. |
| inputs | dict | The JSON-serializable input data. |
| output | Optional[Any] | The result of the execution if completed. |
| attempt | int | The current retry attempt number. |
ExecutionType
An Enum used to distinguish between different units of work:
TASK: A discrete unit of Python logic.WORKFLOW: An orchestrated sequence of tasks defined in a.flowfile.