XWorker Service¶
The XWorker service provides a robust integration with Celery for executing heavy or long-running tasks asynchronously in a distributed environment. It allows you to offload work from the main FastAPI process to separate worker nodes.
Prerequisites¶
- Service Container overview understood
-
celeryinstalled (pip install celery[redis]) - A message broker running (Redis or RabbitMQ)
Key Concepts¶
Distributed Task Execution¶
Unlike the Scheduler, which runs tasks in the main process, XWorker dispatches tasks to a message broker. Independent worker processes then pick up these tasks and execute them, possibly on different physical machines.
Automatic Registration¶
Xcore simplifies Celery configuration by automatically registering tasks defined in your plugins and application modules. You don't need to manually create a celery.py entry point.
Practical Guide¶
1. Defining a Task¶
Use the @task decorator provided by the xcore.services.xworker module.
2. Dispatching a Task from a Plugin¶
You can trigger background work from within any Trusted plugin.
3. Starting the Worker Process¶
In production, you must run the Celery worker as a separate process.
API Reference¶
WorkerService Methods¶
| Method | Return Type | Description |
|---|---|---|
send(task_name, *args, **kwargs) |
str |
Dispatches a task to the broker. Returns the task_id. |
get_result(task_id) |
AsyncResult |
Retrieve the status or result of a previously dispatched task. |
YAML Configuration¶
Common Errors & Pitfalls¶
Task Not Registered
If the worker logs "Received unregistered task of type...", it means the module containing your @task was not imported by the worker.
Fix: Add the module path to the modules: list in xcore.yaml.
Serialization Overhead
Arguments passed to send() must be JSON serializable. Avoid passing complex objects or database instances.
Fix: Pass IDs (e.g., user_id) and fetch the object inside the task logic.
Broker Unreachable
If Xcore cannot connect to Redis/RabbitMQ at startup, the XWorker service will enter a DEGRADED state and send() calls will raise exceptions.
Best Practices¶
Use Dedicated Queues
Separate your tasks into queues based on priority or type (e.g., default, email, heavy). This allows you to scale workers independently for different types of work.
Idempotency
Design your tasks to be idempotent. In a distributed system, a task might be executed more than once due to network issues or worker crashes.