Signals & Events
Signals allow external systems or users to interact with a running workflow. In Rhythm, signals are used to unblock workflows that are waiting for external input, such as a manual approval, a webhook callback, or a message from another system.
Waiting for Signals
In a .flow script, you use the Signal.when function to pause execution until a specific signal is received. Because Rhythm is a durable execution engine, the workflow will suspend, persist its state to the database, and stop consuming resources until the signal arrives.
Signal.when
// Wait indefinitely for a signal
const data = await Signal.when("external-update");
// Wait with a timeout
try {
const approval = await Signal.when("manager-approval", { timeout: "24h" });
} catch (err) {
// Handle timeout logic
await Task.run("escalate-issue", { reason: "Timeout" });
}
Parameters:
name(string): The identifier for the signal the workflow is listening for.options(object):timeout: A string representing duration (e.g.,"1h","24h","15m") or a number of milliseconds.
Returns:
The payload sent by the external caller.
Sending Signals
To unblock a workflow, your application must send a signal using the Rhythm client. This is typically done from your main application code (e.g., inside a FastAPI endpoint or a background task).
Application Client API
When you send a signal, Rhythm performs two actions:
- Records the signal and its payload in the database.
- Re-enqueues the target workflow so a worker can resume its execution.
Python Example
await client.send_signal(
workflow_id="order-123",
signal_name="payment-received",
payload={"amount": 50.00, "status": "success"},
queue="default"
)
API Signature (Internal Service)
The core Rust service defines the signal delivery as follows:
async fn send_signal(
&self,
workflow_id: &str,
signal_name: &str,
payload: JsonValue,
queue: &str,
) -> Result<()>
Signal Data Model
Signals are persisted with the following structure, allowing for audit trails of external interactions:
| Field | Type | Description |
| :--- | :--- | :--- |
| id | UUID | Unique identifier for the signal instance. |
| workflow_id | String | The ID of the workflow this signal is intended for. |
| signal_name | String | The name matching the Signal.when call. |
| payload | JSON | The data passed to the workflow. |
| created_at | Timestamp | When the signal was received by the system. |
Technical Behavior
Suspension and Resumption
When a workflow reaches await Signal.when, the engine changes the execution status to SUSPENDED. The workflow's current stack and variables are serialized.
Once a signal with a matching signal_name is sent to that workflow_id, the engine marks the signal as consumed and moves the workflow back into the work_queue. A worker will then pick up the workflow, restore its state, and resolve the await with the signal's payload.
Determinism and Replay
Unlike other durable execution frameworks, Rhythm does not need to "replay" the entire history of signals to reach the current state. Since the state is persisted at the point of suspension, the signal payload is simply injected into the resumed runtime context.
Timeouts
If a timeout is provided to Signal.when, Rhythm registers a timer in the scheduled queue. If the signal does not arrive before the timer expires, the workflow is re-enqueued and the await statement throws an error that can be caught using a standard try/catch block.