Testing Strategies
Testing durable workflows requires a strategy that spans from simple unit tests for task logic to integrated environment tests for complex orchestration. Because Rhythm avoids event-replay, testing is often more predictable than in other durable execution frameworks.
Testing Tasks
Tasks are the "units" of your application logic. Since they are written in your native application language (e.g., Python), they should be tested using standard unit testing frameworks like pytest.
Mocking Dependencies
Tasks often interact with external APIs or databases. Use standard mocking libraries to isolate the task logic.
# tasks/payment_tasks.py
@app.task(name="process-payment")
def process_payment(amount: float, currency: str):
gateway = PaymentGateway()
return gateway.charge(amount, currency)
# tests/test_tasks.py
from unittest.mock import MagicMock
from tasks.payment_tasks import process_payment
def test_process_payment():
# Mock the external gateway
with patch('tasks.payment_tasks.PaymentGateway') as MockGateway:
instance = MockGateway.return_value
instance.charge.return_value = {"status": "success", "id": "123"}
result = process_payment(100.0, "USD")
assert result["status"] == "success"
Testing Workflow Logic
Workflows written in .flow files focus on orchestration. Testing them involves verifying that the correct tasks are called with the correct parameters and that the workflow handles different task outcomes (success, failure, timeouts) correctly.
Integration Testing with a Test Database
Because Rhythm relies on Postgres for state management, the most reliable way to test a workflow is against a dedicated test database.
- Setup: Initialize a clean Postgres schema.
- Register: Register your
.flowscripts and Python tasks. - Execute: Trigger the workflow using the
rhythmclient. - Assert: Poll the execution status until it reaches
CompletedorFailed, then verify the output.
import pytest
from rhythm import RhythmClient
@pytest.mark.asyncio
async def test_onboarding_workflow():
client = RhythmClient(db_url="postgresql://localhost/rhythm_test")
# Start the workflow
execution_id = await client.start_workflow(
"onboard_user",
inputs={"userId": "user_123"}
)
# Wait for completion (with timeout)
result = await client.wait_for_completion(execution_id, timeout=30)
assert result.status == "completed"
assert result.output["setup_complete"] is True
Mocking Tasks in Workflows
In a complex integration test, you might want to run the real .flow script but mock the implementation of the tasks it calls. You can achieve this by registering "mock workers" that return predefined data instead of performing real side effects.
@app.task(name="submit-report")
def mock_submit_report(report_id):
return {"status": "approved", "approvers": ["admin"]}
Testing Signals and Timeouts
Workflows often involve waiting for external events (Signal.when) or specific durations. To test these efficiently:
- Testing Signals: Start the workflow, verify it moves to a
Suspendedstate, then send a signal using the client and verify it resumes. - Testing Timeouts: While Rhythm doesn't currently support "time travel" in its experimental state, you can test timeout logic by setting short durations in your
.flowscripts (e.g.,timeout: "1s") during development.
// Example: Testing catch block for timeouts
try {
await Signal.when("manager-approval", { timeout: "1s" })
} catch (err) {
// Assert that this branch is taken in your test
return { timed_out: true }
}
Determinism and Side Effects
Unlike replay-based frameworks, Rhythm's core engine persists the actual state of the VM. This means:
- Less Determinism Anxiety: You can use non-deterministic logic (like
Math.random()or getting the current date) inside your.flowscripts safely, as the result is persisted at the point of execution and not recalculated. - Side Effect Safety: Side effects should still live exclusively inside Tasks. While the
.flowlanguage is sandboxed, keeping side effects in tasks ensures that your orchestration logic remains easy to test and reason about.
Best Practices
- Small Workflows: Break complex logic into sub-workflows. Each sub-workflow can be tested in isolation.
- Idempotent Tasks: Ensure tasks can be safely retried. Rhythm handles retries at the engine level, so your task logic should be prepared to run more than once if a network failure occurs after the task executes but before the result is persisted.
- Environment Isolation: Always use a separate database for automated tests to avoid interfering with local development state.