Worker Deployment
Workers in Rhythm are responsible for two primary tasks: executing your application's task logic and processing the internal scheduling queue. Because Rhythm is backed by Postgres, workers are stateless and can be scaled horizontally to meet demand.
Worker Architecture
Rhythm uses a cooperative worker model. The Rust core manages the workflow state machine and task queueing, while your application code (e.g., Python) executes the actual task logic.
- Internal Worker: Processes the
scheduled_queue. It moves delayed tasks and timed-out workflows into the activework_queue. - Task Worker: Polls the
work_queue, executes the task code defined in your application, and reports the result back to the database.
Initializing a Worker
When starting a process intended to act as a worker, you must initialize the Rhythm application and explicitly start the internal worker if that process should also handle scheduling.
Python Example
import asyncio
from rhythm import Rhythm, Worker
async def main():
# Initialize the Rhythm client/app
r = await Rhythm.init(
database_url="postgresql://user:pass@localhost/db",
auto_migrate=True
)
# 1. Start the internal scheduler (usually one per cluster or on all workers)
await r.start_internal_worker()
# 2. Start the task worker to process a specific queue
worker = Worker(r, queue="default")
# Register your tasks
@worker.task("submit-report")
async def submit_report(payload):
print(f"Processing report: {payload['reportId']}")
return {"status": "success"}
# Start polling for work
await worker.start()
if __name__ == "__main__":
asyncio.run(main())
Scaling Workers
Scaling in Rhythm is handled at the database level using SKIP LOCKED queries. This ensures that multiple workers can poll the same queue without duplicate delivery.
- Horizontal Scaling: You can spin up any number of worker instances across different nodes.
- Queue Partitioning: You can direct workers to specific queues using the
queueparameter during initialization. This allows you to isolate heavy workloads or provide different resource limits (e.g., ahigh-cpuqueue).
Graceful Shutdown
Rhythm is designed for durability, but abrupt shutdowns can lead to zombie tasks that remain in a Running state until a timeout or manual intervention. Always use the provided shutdown hooks to ensure the worker finishes its current task before exiting.
Shutdown Signal Handling
The Rust core uses a CancellationToken to manage lifecycle. In your host language, you should trigger the shutdown when receiving SIGTERM or SIGINT.
import signal
# In your main loop:
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(r.shutdown()))
# The worker will stop fetching new work and wait for
# active tasks to complete or reach a timeout.
await worker.start()
Configuration & Tuning
The InitOptions (Rust) and the initialization parameters in the SDK allow you to tune worker behavior:
| Parameter | Type | Description |
| :--- | :--- | :--- |
| database_url | string | The Postgres connection string. |
| auto_migrate | bool | If true, the worker will attempt to apply schema migrations on startup. |
| queue | string | The specific queue this worker instance should listen to. |
Heartbeats and Timeouts
Workflow state is persisted at every await point in your .flow script. If a worker process dies while executing a task, the task will eventually be marked as failed based on the database-level visibility timeout, allowing it to be retried by another worker (if configured).
Deployment Patterns
- Unified Process: For small workloads, run the API, the Internal Worker, and the Task Worker in a single container.
- Split Scheduler: Run one dedicated instance with
start_internal_worker()to handle scheduling, and scale the Task Workers separately. - Task-Specific Workers: Deploy specialized containers that only register a subset of tasks (e.g., workers with GPU access listening only to a
ml-tasksqueue).