Chapter 12: Queue System
1. Do Not Make the User Wait
Your app sends welcome emails on signup. Generates PDF invoices. Resizes uploaded images. Each task takes 2 to 30 seconds. Run them inside the HTTP request and the user stares at a spinner while the server grinds through email delivery, invoice rendering, and image resizing.
Queues move slow work to a background process. The HTTP handler drops a job onto a queue and responds to the user in under 100 milliseconds. A separate consumer picks up the job and does the work at its own pace. The user sees "Welcome! Check your email." The email arrives 5 seconds later.
Tina4's queue system works out of the box with a file-based backend. No Redis. No RabbitMQ. No external services. Add jobs and process them.
2. Why Queues Matter
Without queues:
User clicks "Sign Up"
-> Server validates input (10ms)
-> Server creates user in database (20ms)
-> Server sends welcome email (3000ms)
-> Server generates PDF welcome kit (2000ms)
-> Server resizes avatar (1500ms)
-> User sees response (6530ms later)With queues:
User clicks "Sign Up"
-> Server validates input (10ms)
-> Server creates user in database (20ms)
-> Server queues: send welcome email (1ms)
-> Server queues: generate PDF (1ms)
-> Server queues: resize avatar (1ms)
-> User sees response (33ms later)
Meanwhile, in the background:
-> Consumer sends welcome email
-> Consumer generates PDF
-> Consumer resizes avatar6.5 seconds becomes 33 milliseconds. The user feels the difference.
3. File Queue (Default)
The file-based backend is the default. No configuration needed.
Creating a Queue and Pushing a Job
queue = Tina4::Queue.new(topic: "emails")
# Push a job
queue.push({
to: "alice@example.com",
subject: "Order Confirmation",
body: "Your order #1234 has been confirmed."
})Convenience Method: produce
The produce method pushes to a specific topic without creating a separate Queue instance:
queue = Tina4::Queue.new(topic: "emails")
queue.produce("invoices", { order_id: 101, format: "pdf" })Push with Priority
Jobs default to priority 0 (normal). Higher numbers are popped first:
# Normal priority (default)
queue.push({ to: "alice@example.com", subject: "Newsletter" })
# High priority -- processed before normal jobs
queue.push({ to: "alice@example.com", subject: "Password Reset" }, priority: 10)Queue Size
Check how many pending messages are in the queue:
count = queue.sizePass a status keyword to count jobs in a specific state:
failed = queue.size(status: "failed")4. Pushing from Route Handlers
Tina4::Router.post("/api/register") do |request, response|
body = request.body
user_id = 42 # Simulated
queue = Tina4::Queue.new(topic: "emails")
queue.push({
user_id: user_id,
to: body["email"],
name: body["name"],
subject: "Welcome!"
})
response.json({
message: "Registration successful. Welcome email will arrive shortly.",
user_id: user_id
}, 201)
endcurl -X POST http://localhost:7147/api/register \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com", "password": "securePass123"}'{
"message": "Registration successful. Welcome email will arrive shortly.",
"user_id": 42
}5. Consuming Jobs
The consume method yields jobs one at a time via a block. Each job must be explicitly completed or failed:
queue = Tina4::Queue.new(topic: "emails")
queue.consume("emails") do |job|
begin
send_email(job.payload[:to], job.payload[:subject], job.payload[:body])
job.complete
rescue => e
job.fail(e.message)
end
endRetry with Delay
If a job fails but you want to retry it after a cooldown instead of marking it as failed:
queue.consume("emails") do |job|
begin
send_email(job.payload[:to], job.payload[:subject], job.payload[:body])
job.complete
rescue => e
# Retry after 30 seconds instead of failing immediately
job.retry(queue: queue, delay_seconds: 30)
end
endManual Pop
For more control, pop a single message:
job = queue.pop
unless job.nil?
begin
send_email(job.payload[:to], job.payload[:subject])
job.complete
rescue => e
job.fail(e.message)
end
end6. Job Lifecycle
Every job moves through states:
push -> PENDING -> pop/consume -> RESERVED -> job.complete -> COMPLETED
-> job.fail -> FAILED
|
retry (manual)
|
PENDING
|
max retries exceeded
|
DEAD LETTERJob Methods
When you receive a job from consume or pop, you have three methods:
job.complete-- mark the job as donejob.fail(reason)-- mark the job as failed with a reason stringjob.reject(reason)-- alias forfailjob.retry(queue: queue, delay_seconds: 30)-- re-queue the job after a delay (in seconds). The job goes back to PENDING after the delay elapses. You must pass thequeuereference.
Job Properties
job.topic-- the topic this job belongs to. Useful when consuming from multiple topics.
Always call complete, fail, or retry on every job. If you do not, the job stays reserved.
7. Retry and Dead Letters
Max Retries
The default max_retries is 3. When a job's attempt count reaches max_retries, retry_failed skips it.
Retrying Failed Jobs
# Retry all failed jobs (skips those that exceeded max_retries)
queue.retry_failedDead Letters
Jobs that have exceeded max_retries are dead letters. There is no magic dead letter queue -- you retrieve and handle them yourself:
dead_jobs = queue.dead_letters
dead_jobs.each do |job|
puts "Dead job: #{job.id}"
puts " Payload: #{job.payload}"
puts " Error: #{job.error}"
endPurging Jobs
Remove jobs by status:
queue.purge("completed")
queue.purge("failed")8. Producing Multiple Jobs
Sometimes a single action triggers multiple background tasks:
Tina4::Router.post("/api/orders") do |request, response|
body = request.body
order_id = 101
queue = Tina4::Queue.new(topic: "emails")
queue.push({
order_id: order_id,
to: body["email"],
subject: "Order Confirmation"
})
queue.produce("invoices", {
order_id: order_id,
format: "pdf"
})
queue.produce("inventory", {
items: body["items"]
})
queue.produce("warehouse", {
order_id: order_id,
shipping_address: body["shipping_address"]
})
response.json({
message: "Order placed successfully",
order_id: order_id
}, 201)
endFour jobs are queued in under 5 milliseconds. The user gets an instant response.
9. Switching Backends
Switching backends is a config change, not a code change.
Default: File
# No config needed -- file is the default
# Optionally set a custom storage path (defaults to ./queue)
TINA4_QUEUE_PATH=./data/queueRabbitMQ
TINA4_QUEUE_BACKEND=rabbitmq
TINA4_QUEUE_URL=amqp://user:pass@localhost:5672Kafka
TINA4_QUEUE_BACKEND=kafka
TINA4_QUEUE_URL=localhost:9092MongoDB
TINA4_QUEUE_BACKEND=mongodb
TINA4_QUEUE_URL=mongodb://user:pass@localhost:27017/tina4Install the MongoDB driver:
gem install mongoYour code does not change. The same queue.push and queue.consume calls work with every backend.
10. Separate Producer and Consumer Patterns
Use separate Tina4::Queue instances in different files or services for clarity:
# Producer side (e.g., in a route handler)
queue = Tina4::Queue.new(topic: "emails")
queue.push({ to: "alice@example.com", subject: "Hello" })
# Consumer side (e.g., in a worker script)
queue = Tina4::Queue.new(topic: "emails")
queue.consume("emails") do |job|
process(job)
job.complete
endThe same Tina4::Queue class handles both producing and consuming. Separate instances in different files make intent clearer.
11. Exercise: Build an Email Queue
Build a queue-based email system with failure handling.
Requirements
- Create these endpoints:
| Method | Path | Description |
|---|---|---|
POST | /api/emails/send | Queue an email for sending |
GET | /api/emails/queue | Show pending email count |
GET | /api/emails/dead | List dead letter jobs |
POST | /api/emails/retry | Retry all failed jobs |
The email payload should include:
to(required),subject(required),body(required)Create a consumer that processes the queue, simulating occasional failures
Test with:
curl -X POST http://localhost:7147/api/emails/send \
-H "Content-Type: application/json" \
-d '{"to": "alice@example.com", "subject": "Welcome!", "body": "Thanks for signing up."}'
curl http://localhost:7147/api/emails/queue
curl http://localhost:7147/api/emails/dead
curl -X POST http://localhost:7147/api/emails/retry12. Solution
Create src/routes/email_queue.rb:
queue = Tina4::Queue.new(topic: "emails")
# @noauth
Tina4::Router.post("/api/emails/send") do |request, response|
body = request.body
errors = []
errors << "'to' is required" if body["to"].nil? || body["to"].empty?
errors << "'subject' is required" if body["subject"].nil? || body["subject"].empty?
errors << "'body' is required" if body["body"].nil? || body["body"].empty?
unless errors.empty?
return response.json({ errors: errors }, 400)
end
message_id = queue.push({
to: body["to"],
subject: body["subject"],
body: body["body"]
})
response.json({
message: "Email queued for sending",
message_id: message_id
}, 201)
end
Tina4::Router.get("/api/emails/queue") do |request, response|
count = queue.size
response.json({ pending: count })
end
Tina4::Router.get("/api/emails/dead") do |request, response|
dead_jobs = queue.dead_letters
items = dead_jobs.map do |job|
{ id: job.id, payload: job.payload, error: job.error }
end
response.json({ dead_letters: items, count: items.length })
end
Tina4::Router.post("/api/emails/retry") do |request, response|
queue.retry_failed
response.json({ message: "Failed emails re-queued for retry" })
endCreate a separate consumer file src/workers/email_worker.rb:
queue = Tina4::Queue.new(topic: "emails")
queue.consume("emails") do |job|
payload = job.payload
puts "Sending email to #{payload[:to]}..."
puts " Subject: #{payload[:subject]}"
begin
# Simulate sending (replace with real email logic)
sleep(1)
# Simulate failure for a specific address
if payload[:to] == "bad@example.com"
raise "SMTP connection refused"
end
puts " Email sent to #{payload[:to]} successfully!"
job.complete
rescue => e
puts " Failed: #{e.message}"
job.fail(e.message)
end
endAfter the consumer has retried a job to bad@example.com three times, queue.dead_letters returns that job. The /api/emails/dead endpoint shows it. You investigate, fix the address, and call /api/emails/retry to re-queue.
13. Gotchas
1. Always call complete or fail
Problem: Jobs stay in reserved status forever.
Cause: Your consumer block does not call job.complete or job.fail. The job stays reserved and is never released.
Fix: Always call one of job.complete, job.fail(reason), or job.reject(reason) in your consumer block.
2. Worker not picking up messages
Problem: Messages are pushed but nothing happens.
Cause: No consumer process is running, or the consumer is listening on a different topic.
Fix: Make sure the consumer is running. Check that the topic name in queue.push matches the topic in queue.consume.
3. Payload must be serializable
Problem: queue.push throws an error.
Cause: You passed a non-serializable object (database connection, file handle, etc.).
Fix: Only pass simple data types in the payload. If you need to reference a database record, pass the ID. The consumer looks up records by ID.
4. Dead letters pile up
Problem: Dead letters accumulate and nobody notices.
Cause: Failed jobs that exceed max_retries become dead letters but are never cleaned up.
Fix: Monitor dead letters with queue.dead_letters. Set up an alert when the count exceeds a threshold. Investigate, fix, then call queue.retry_failed or queue.purge("failed").
5. File backend for production
Problem: Multiple workers cause contention on the file backend.
Cause: The file backend is designed for single-worker setups.
Fix: For production with multiple workers, switch to RabbitMQ, Kafka, or MongoDB via the TINA4_QUEUE_BACKEND environment variable.
6. Consumer block returns nothing useful
Problem: Jobs are processed but the system does not track completion.
Cause: Your consumer block does not call job.complete. Without an explicit call, the job stays reserved.
Fix: Always call job.complete when the job succeeds and job.fail(reason) when it fails.