Defining Tasks
Overview of Tasks
In Rhythm, Tasks are the units of work where your application logic resides. While Workflows (written in .flow files) orchestrate the sequence of events, Tasks are written in your native application language (e.g., Python) to perform side effects like querying a database, calling an external API, or processing data.
Tasks are executed by Workers. When a Workflow calls Task.run("task-name", { ... }), Rhythm enqueues a job that a Worker picks up and executes.
Defining a Task
To define a task in Python, use the @task decorator provided by the Rhythm SDK. Each task must have a unique name that matches the string used in your .flow scripts.
from rhythm import task
@task(name="submit-report")
def submit_report(inputs: dict):
report_id = inputs.get("reportId")
# Perform application logic
print(f"Processing report: {report_id}")
return {
"status": "success",
"timestamp": "2023-10-27T10:00:00Z"
}
Task Naming
By default, the task name is the name of the function. However, it is highly recommended to explicitly provide a name argument to the decorator. This ensures that refactoring the function name in your Python code doesn't break existing workflows that reference the task by string.
Input and Output
Tasks interact with Rhythm using JSON-serializable data:
- Inputs: Received as a single dictionary (or mapped arguments depending on the SDK configuration).
- Outputs: Any value returned by the function will be serialized to JSON and sent back to the requesting workflow.
[!IMPORTANT] Because Rhythm is a durable execution framework, all task inputs and outputs must be JSON-serializable. Avoid returning complex objects like class instances or database connections.
Accessing Task Context
Sometimes a task needs to know about its execution environment (e.g., the execution_id or the current attempt number). You can access this via the current_context helper.
from rhythm import task, current_context
@task(name="process-payment")
def process_payment(inputs: dict):
ctx = current_context()
print(f"Running execution: {ctx.execution_id}")
print(f"Retry attempt: {ctx.attempt}")
# Logic...
Error Handling and Retries
When a Task raises an exception, Rhythm catches it and marks the task execution as failed.
The core engine handles retries based on your worker configuration. If a task fails, the workflow engine can be configured to retry the task before moving to a failed state in the workflow script. In your .flow file, you can also use try/catch blocks to handle task failures gracefully:
// workflows/example.flow
try {
await Task.run("flaky-api-call")
} catch (err) {
// Logic to handle failure (e.g., alert admin or cleanup)
await Task.run("cleanup")
}
Registering Tasks with a Worker
Defining a task is not enough; a Worker must be aware of the task to execute it. When you initialize your Rhythm worker, ensure the modules containing your tasks are imported.
import rhythm
from my_app import tasks # Ensure decorators are executed
# Initialize the client and start the worker
client = rhythm.init()
worker = client.worker()
# The worker will now listen for and execute the decorated tasks
worker.run()
Best Practices
- Idempotency: Tasks may be retried if a worker crashes or a network timeout occurs. Ensure your tasks are idempotent (running them multiple times has the same effect as running them once), especially when performing financial transactions or sending notifications.
- Short-lived: Tasks should ideally be short-lived. If you have a process that takes hours, consider breaking it into multiple smaller tasks orchestrated by a workflow, or use Rhythm's signaling mechanism.
- Side Effects: Keep all side effects (DB writes, API calls) inside Tasks rather than Workflows. Workflows should remain focused on the "flow" of logic.