Client API
The Client API is the primary interface for your application code to interact with the Rhythm engine. It allows you to trigger workflows, schedule tasks for the future, send signals to running processes, and query the status of executions.
Initializing the Client
Before interacting with Rhythm, you must initialize the application. This process handles database connections, runs migrations (if enabled), and registers your .flow workflow definitions.
Rhythm uses a builder pattern for initialization to allow flexible configuration.
from rhythm import InitBuilder, WorkflowFile
# Initialize Rhythm
rhythm = await (
InitBuilder()
.database_url("postgres://user:pass@localhost/db")
.auto_migrate(True)
.workflows([
WorkflowFile(
name="onboard_user",
source=open("./workflows/onboard.flow").read(),
file_path="./workflows/onboard.flow"
)
])
.init()
)
Starting Executions
Executions can be started immediately or scheduled for a specific time in the future.
Immediate Execution
To start a workflow or task immediately, use the execution_service. This enqueues the work for the next available worker.
execution_id = await rhythm.execution_service.create_execution({
"exec_type": "workflow", # or "task"
"target_name": "onboard_user",
"queue": "default",
"inputs": {"userId": "123", "plan": "pro"}
})
Scheduled Execution
If you need an execution to start at a specific timestamp, use the scheduler_service.
from datetime import datetime, timedelta
run_at = datetime.utcnow() + timedelta(hours=24)
execution_id = await rhythm.scheduler_service.schedule_execution({
"exec_type": "workflow",
"target_name": "daily_report",
"queue": "reports",
"inputs": {"orgId": "abc"},
"run_at": run_at
})
Signaling Workflows
Signals allow external systems to send data into a running workflow. When a workflow is await Signal.when("name"), it suspends until the Client API sends a matching signal.
await rhythm.signal_service.send_signal(
workflow_id="exec_abc123",
signal_name="manager-approval",
payload={"approved": True, "approver": "admin"},
queue="default"
)
Managing Executions
The Client API provides methods to inspect the state and output of executions.
Retrieving a Single Execution
Get the full state of a specific execution, including its status and final output.
execution = await rhythm.execution_service.get_execution("exec_abc123")
if execution.status == "completed":
print(f"Result: {execution.output}")
elif execution.status == "failed":
print(f"Error: {execution.output}")
Querying Executions
You can filter executions based on their target name, status, or parent workflow.
from rhythm import ExecutionFilters, ExecutionStatus
filters = ExecutionFilters(
target_name="onboard_user",
status=ExecutionStatus.RUNNING,
limit=10
)
active_onboardings = await rhythm.execution_service.query_executions(filters)
Data Models
ExecutionStatus
Represents the lifecycle stage of a task or workflow:
pending: Enqueued and waiting for a worker.running: Currently being processed by a worker.suspended: Paused (e.g., waiting for a signal or timer).completed: Successfully finished with an output.failed: Terminated due to an error.
Execution Object
The standard return object for execution queries contains:
id: Unique identifier for the execution.type: Eitherworkflowortask.target_name: The name of the workflow or task function.status: CurrentExecutionStatus.inputs: The JSON payload provided at start.output: The JSON result (if completed) or error details (if failed).created_at: UTC timestamp of creation.completed_at: UTC timestamp of completion (optional).