Queue Module

The Queue module provides a robust background job processing system with priority queues, backpressure management, dead letter queues, and configurable retry strategies. It integrates with Redis, RabbitMQ, and in-process workers.

Configuration

example.py
python
Copied!
1from vorte import Vorte
2
3app = Vorte(
4 auto_load=True,
5 config={
6 "queue": {
7 "backend": "redis",
8 "url": "redis://localhost:6379/1",
9 "workers": 4,
10 "retry_strategy": {
11 "max_retries": 3,
12 "base_delay": 1,
13 "max_delay": 60,
14 "backoff": "exponential",
15 },
16 "dead_letter": {
17 "enabled": True,
18 "max_entries": 10000,
19 "retention": 604800,
20 },
21 "backpressure": {
22 "enabled": True,
23 "max_queue_size": 10000,
24 "strategy": "reject",
25 },
26 },
27 },
28)

Defining Jobs

send_email.py
python
Copied!
1from vorte.queue import Job, job
2
3@job(priority=5, retries=3)
4async def send_email(to: str, subject: str, body: str):
5 await email_client.send(to=to, subject=subject, body=body)
6 return {"status": "sent", "to": to}
7
8@job(priority=1, queue="critical")
9async def process_payment(order_id: str, amount: float):
10 result = await payment_gateway.charge(order_id, amount)
11 return {"order_id": order_id, "status": result.status}
12
13@job(priority=10, queue="batch", timeout=300)
14async def generate_report(report_type: str, date_range: dict):
15 report = await analytics.generate(report_type, date_range)
16 await storage.upload(f"reports/{report.id}.pdf", report.pdf_bytes)
17 return {"report_id": report.id, "url": report.url}

Enqueuing Jobs

create_user.py
python
Copied!
1from vorte.queue import Queue
2
3queue = Queue()
4
5@router.post("/users")
6async def create_user(payload: CreateUserPayload):
7 user = await db.create_user(payload)
8
9 await queue.enqueue(
10 send_email,
11 to=user.email,
12 subject="Welcome!",
13 body="Your account has been created.",
14 )
15
16 await queue.enqueue(
17 process_payment,
18 order_id=user.order_id,
19 amount=9.99,
20 delay=60,
21 )
22
23 return success_response(data=user)

Priority Levels

PriorityValueUse Case
CRITICAL1Payment processing, security alerts
HIGH3User-facing notifications, time-sensitive tasks
NORMAL5Default priority, standard processing
LOW7Analytics, data synchronization
BATCH10Report generation, bulk imports

Backpressure Management

When the queue reaches capacity, Vorte applies a backpressure strategy to prevent system overload.

StrategyBehavior
rejectReject new jobs and return an error to the caller
delayDelay accepting new jobs until capacity is available
drop_oldestDrop the lowest-priority queued job to make room
scaleAutomatically scale worker count up to a configured maximum

Dead Letter Queue

Jobs that exceed their retry limit are moved to the dead letter queue (DLQ) for inspection and manual replay.

example.py
python
Copied!
1from vorte.queue import DeadLetterQueue
2
3dlq = DeadLetterQueue()
4
5failed_jobs = await dlq.list(status="failed", limit=50)
6
7for job in failed_jobs:
8 print(f"Job {job.id}: {job.error_message}")
9 print(f" Retries: {job.retry_count}")
10 print(f" Queue: {job.queue_name}")
11
12await dlq.replay(job_id="job_abc123")
13
14await dlq.replay_all(queue="emails")
15
16await dlq.purge(older_than=604800)

Retry Logic

fetch_external_api.py
python
Copied!
1from vorte.queue import Job, job
2
3@job(
4 retries=5,
5 backoff="exponential",
6 base_delay=1,
7 max_delay=120,
8 retry_on=[ConnectionError, TimeoutError],
9)
10async def fetch_external_api(url: str):
11 async with httpx.AsyncClient() as client:
12 response = await client.get(url, timeout=10)
13 response.raise_for_status()
14 return response.json()
Backoff StrategyDelay Calculation
constantAlways base_delay
linearbase_delay * retry_number
exponentialbase_delay * 2^retry_number

Scheduled Jobs

cleanup_expired_sessions.py
python
Copied!
1from vorte.queue import Queue, schedule
2
3queue = Queue()
4
5@schedule(interval=3600)
6async def cleanup_expired_sessions():
7 count = await db.execute("DELETE FROM sessions WHERE expires_at < NOW()")
8 print(f"Cleaned up {count} expired sessions")
9
10@schedule(cron="0 2 * * *")
11async def daily_report():
12 await queue.enqueue(generate_report, report_type="daily")
13
14@schedule(interval=60, queue="maintenance")
15async def sync_external_data():
16 await external_api.sync()

Job Monitoring

example.py
python
Copied!
1from vorte.queue import Queue
2
3queue = Queue()
4
5stats = await queue.stats()
6print(f"Pending: {stats.pending}")
7print(f"Active: {stats.active}")
8print(f"Completed: {stats.completed}")
9print(f"Failed: {stats.failed}")
10print(f"DLQ size: {stats.dead_letter}")
11
12job = await queue.get_job("job_abc123")
13print(f"Status: {job.status}")
14print(f"Progress: {job.progress}%")
15print(f"Started: {job.started_at}")
16print(f"Duration: {job.duration_ms}ms")
Stay in the loop

Get Vorte release notes, module guides, and developer deep-dives. No spam — unsubscribe anytime.