This example demonstrates a complex Trusted plugin designed to synchronize data from an external 3rd-party API into the local database and cache. It leverages the Scheduler for timing, the XWorker for offloading heavy processing, and Hooks for observability.
importtimefromxcoreimportTrustedBase,ok,errorfromxcore.services.baseimportServiceStatusclassPlugin(TrustedBase):asyncdefon_load(self):self.db=self.get_service("db")self.cache=self.get_service("cache")self.scheduler=self.get_service("scheduler")self.worker=self.get_service("worker")asyncdefon_start(self):# (1) Register a complex Cron Job# Runs every hour at minute 0@self.scheduler.cron("0 * * * *")asyncdeftrigger_sync():awaitself._orchestrate_sync()asyncdefon_stop(self):# (2) Graceful state cleanupawaitself.cache.delete("sync:active_lock")print(f"[{self.name}] synchronization paused")asyncdef_orchestrate_sync(self):# 1. Distributed Lock (to prevent parallel runs)ifawaitself.cache.exists("sync:active_lock"):returnawaitself.cache.set("sync:active_lock",True,ttl=300)try:# 2. Fetch Data via Gateway Plugin (IPC)response=awaitself.ctx.registry.get_service("gateway").call("fetch_raw_data")raw_items=response.get("data",[])# 3. Batch Dispatch to Workers# We don't process data here; we offload it to Celery.forbatchinself._chunk_list(raw_items,100):self.worker.send("tasks.sync:process_batch",batch)# 4. Log Success to Databaseasyncwithself.db.session()assess:awaitsess.execute("INSERT INTO sync_logs (timestamp, count) VALUES (:t, :c)",{"t":time.time(),"c":len(raw_items)})finally:awaitself.cache.delete("sync:active_lock")def_chunk_list(self,lst,n):foriinrange(0,len(lst),n):yieldlst[i:i+n]asyncdefhandle(self,action,payload):ifaction=="force_sync":awaitself._orchestrate_sync()returnok(msg="Sync triggered manually")returnerror("Unknown action")
By using self.cache as a lock coordinator, we ensure that if synchronization takes longer than an hour, the next scheduled job won't start until the first one is finished. This is crucial for avoiding race conditions in data-intensive tasks.
Instead of importing httpx directly, this plugin calls a gateway plugin. This separates the Network Policy (retries, timeouts, credentials) from the Sync Logic.
Processing 10,000 items in the main event loop would block other requests. By chunking the data and sending it to XWorker, we distribute the CPU load across multiple physical worker processes.
# 1. Check if the job is scheduledxcoreservicesstatus--servicescheduler
# 2. Trigger a manual sync for testingxcoreplugincalldata_syncerforce_sync--payload'{}'# 3. Watch the background worker logsxcoreworkerstart--queuesdefault--loglevel=debug