Quick Start
Prerequisites
Before getting started, ensure you have:
- Python 3.10+ installed.
- Postgres running. Rhythm uses Postgres as its primary state store and task queue.
The easiest way to start Postgres is via Docker:
docker run --name rhythm-db -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres
1. Install the SDK
Install the Rhythm Python library:
pip install rhythm-sdk
2. Create a Workflow Script
Workflows are written in .flow files using Rhythm's durable scripting syntax. Unlike standard Python, these scripts can pause and resume across restarts without replaying history.
Create a file named onboard.flow:
// onboard.flow
const user = Inputs.user;
// Run a durable task and wait for the result
const result = await Task.run("create-user-record", { name: user.name });
// Pause execution until a signal is received (e.g., email confirmation)
await Signal.when("email-verified", { timeout: "24h" });
await Task.run("send-welcome-email", { email: user.email });
return { status: "completed", userId: result.id };
3. Define Your Tasks
Tasks are standard Python functions that perform the actual work (API calls, DB operations, etc.). Use the @task decorator to register them.
Create main.py:
import asyncio
from rhythm import init, task, WorkflowFile
# Define the tasks referenced in your .flow file
@task(name="create-user-record")
async def create_user(inputs):
print(f"Creating user: {inputs['name']}")
return {"id": "user_123"}
@task(name="send-welcome-email")
async def send_email(inputs):
print(f"Sending email to: {inputs['email']}")
return {"sent": True}
async def main():
# 1. Initialize Rhythm and register the workflow
with open("onboard.flow", "r") as f:
flow_source = f.read()
app = await init(
database_url="postgresql://postgres:postgres@localhost:5432/postgres",
auto_migrate=True,
workflows=[
WorkflowFile(name="onboard-user", source=flow_source)
]
)
# 2. Start the worker to process tasks and workflows
print("Worker started. Press Ctrl+C to exit.")
await app.worker().run()
if __name__ == "__main__":
asyncio.run(main())
4. Execute the Workflow
Once your worker is running, you can trigger a workflow instance from any Python client.
from rhythm import Client
async def start_onboarding():
client = await Client.connect("postgresql://postgres:postgres@localhost:5432/postgres")
# Start the workflow
execution_id = await client.executions.create(
target_name="onboard-user",
inputs={"user": {"name": "Alice", "email": "alice@example.com"}}
)
print(f"Workflow started: {execution_id}")
# Later, send a signal to resume the paused workflow
await client.signals.send(
workflow_id=execution_id,
signal_name="email-verified",
payload={}
)
Key Concepts
Durable Scripting
Rhythm workflows use a custom embedded engine. When a workflow hits an await, the entire runtime state (variables, call stack, and instruction pointer) is serialized to Postgres. When the task completes or the signal arrives, the engine restores the state and resumes exactly where it left off.
Execution Lifecycle
- Pending: Enqueued and waiting for a worker.
- Running: Currently being executed by a worker.
- Suspended: Waiting for an external event (like
Signal.when). - Completed/Failed: Terminal states.
Configuration
The init function provides a InitBuilder interface to configure database connections and register .flow files. In production, you can also use a rhythm.yaml file for more complex configurations.