Chapter 12: Queue System
1. Do Not Make the User Wait
Your app sends welcome emails on signup. Generates PDF invoices. Resizes uploaded images. Each task takes 2 to 30 seconds. Run them inside the HTTP request and the user stares at a spinner while the server grinds through email delivery, invoice rendering, and image resizing.
Queues move slow work to a background process. The HTTP handler drops a job onto a queue and responds to the user in under 100 milliseconds. A separate consumer picks up the job and does the work at its own pace. The user sees "Welcome! Check your email." The email arrives 5 seconds later.
Tina4's queue system works out of the box with a file-based backend. No Redis. No RabbitMQ. No external services. Add jobs and process them.
2. Why Queues Matter
Without queues:
User clicks "Sign Up"
-> Server validates input (10ms)
-> Server creates user in database (20ms)
-> Server sends welcome email (3000ms)
-> Server generates PDF welcome kit (2000ms)
-> Server resizes avatar (1500ms)
-> User sees response (6530ms later)With queues:
User clicks "Sign Up"
-> Server validates input (10ms)
-> Server creates user in database (20ms)
-> Server queues: send welcome email (1ms)
-> Server queues: generate PDF (1ms)
-> Server queues: resize avatar (1ms)
-> User sees response (33ms later)
Meanwhile, in the background:
-> Consumer sends welcome email
-> Consumer generates PDF
-> Consumer resizes avatar6.5 seconds becomes 33 milliseconds. The user feels the difference.
3. File Queue (Default)
The file-based backend is the default. No configuration needed.
Creating a Queue and Pushing a Job
queue = Tina4::Queue.new(topic: "emails")
# Push a job -- returns the Job that was queued
job = queue.push({
to: "alice@example.com",
subject: "Order Confirmation",
body: "Your order #1234 has been confirmed."
})
puts job.id # the generated job idpush returns a Tina4::Job, not an integer. Read job.id when you need the identifier.
Convenience Method: produce
The produce method pushes to a specific topic without creating a separate Queue instance. It also returns the Job:
queue = Tina4::Queue.new(topic: "emails")
queue.produce("invoices", { order_id: 101, format: "pdf" })Push with Priority
Jobs default to priority 0 (normal). The queue pops the highest priority first, and breaks ties oldest-first:
# Normal priority (default)
queue.push({ to: "alice@example.com", subject: "Newsletter" })
# High priority -- popped before normal jobs
queue.push({ to: "alice@example.com", subject: "Password Reset" }, priority: 10)Delaying a Job
Pass delay_seconds to hold a job back. It stays invisible to consumers until the delay elapses:
# Becomes available 60 seconds from now
queue.push({ to: "alice@example.com", subject: "Reminder" }, delay_seconds: 60)Queue Size
Check how many pending messages are in the queue:
count = queue.sizePass a status keyword to count jobs in a specific state. "failed" and "dead" both count jobs in the dead-letter store:
dead = queue.size(status: "dead")4. Pushing from Route Handlers
Tina4::Router.post("/api/register") do |request, response|
body = request.body
user_id = 42 # Simulated
queue = Tina4::Queue.new(topic: "emails")
queue.push({
user_id: user_id,
to: body["email"],
name: body["name"],
subject: "Welcome!"
})
response.json({
message: "Registration successful. Welcome email will arrive shortly.",
user_id: user_id
}, 201)
endcurl -X POST http://localhost:7147/api/register \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com", "password": "securePass123"}'{
"message": "Registration successful. Welcome email will arrive shortly.",
"user_id": 42
}5. Consuming Jobs
The consume method yields jobs one at a time via a block. Each job must be explicitly completed or failed:
queue = Tina4::Queue.new(topic: "emails")
queue.consume("emails") do |job|
begin
send_email(job.payload[:to], job.payload[:subject], job.payload[:body])
job.complete
rescue => e
job.fail(e.message)
end
endconsume polls forever by default. When the queue is empty it sleeps for poll_interval seconds (default 1.0) and polls again. To drain the queue once and stop, pass poll_interval: 0. To stop after a set number of jobs, pass iterations:.
Automatic Retry
When you call job.fail(reason), the queue handles the retry for you. It increments the job's attempt count and puts the job straight back onto the pending queue. The next pop or consume iteration picks it up again. Once a job has been attempted max_retries times, the queue stops retrying and moves it to the dead-letter store.
So the consumer loop below retries a failing job up to max_retries times on its own, then dead-letters it. No manual retry_failed call is needed:
# Retry each failed job up to 3 times, then dead-letter it
queue = Tina4::Queue.new(topic: "emails", max_retries: 3)
queue.consume("emails") do |job|
begin
send_email(job.payload[:to], job.payload[:subject], job.payload[:body])
job.complete
rescue => e
job.fail(e.message) # auto re-enqueues until attempts == max_retries
end
endRetry Backoff
By default a failed job is retried on the very next poll. To wait before the next attempt, set retry_backoff (in seconds) when you create the queue:
# Wait 30 seconds before each retry
queue = Tina4::Queue.new(topic: "emails", max_retries: 3, retry_backoff: 30)Manual Retry with Delay
job.retry is a manual override, distinct from the automatic fail path. It always re-queues the job regardless of the retry limit and increments the attempt count. The job already carries its queue reference from pop/consume, so you only pass the delay:
queue.consume("emails") do |job|
begin
send_email(job.payload[:to], job.payload[:subject], job.payload[:body])
job.complete
rescue => e
# Re-queue this job to run again 30 seconds from now
job.retry(delay_seconds: 30)
end
endManual Pop
For more control, pop a single message. pop returns the highest-priority available job, or nil when the queue is empty:
job = queue.pop
unless job.nil?
begin
send_email(job.payload[:to], job.payload[:subject])
job.complete
rescue => e
job.fail(e.message)
end
endPop a Specific Job by ID
Pull one known job out of the pending queue with pop_by_id. It returns the matching Job (claimed from the queue) or nil if no pending job has that id:
job = queue.pop_by_id("abc-123")
job.complete if jobconsume accepts the same lookup via id: -- it processes that single job and returns:
queue.consume("emails", id: "abc-123") do |job|
send_email(job.payload[:to], job.payload[:subject])
job.complete
end6. Job Lifecycle
Every job moves through these states:
push -> PENDING -> pop/consume -> RESERVED -> job.complete -> COMPLETED (terminal)
-> job.fail -> attempts += 1
|
attempts < max_retries
|
PENDING (auto re-queue)
|
attempts >= max_retries
|
DEAD LETTERjob.fail does the bookkeeping automatically: it re-queues the job while it still has retries left, and moves it to the dead-letter store once it has been attempted max_retries times.
Job Methods
When you receive a job from consume or pop, you have these methods:
job.complete-- mark the job as done. Terminal; the job is removed.job.fail(reason)-- record a failed attempt with a reason string. Auto re-queues while retries remain, then dead-letters.job.reject(reason)-- alias forfail.job.retry(delay_seconds: 0)-- manual override. Re-queue the job after an optional delay (in seconds), regardless of the retry limit.
Job Properties
job.id-- the generated job id.job.payload-- the data you pushed.job.topic-- the topic this job belongs to. Useful when consuming from multiple topics.job.attempts-- how many times the job has been attempted.job.error-- the reason from the lastfail/reject.
Always call complete, fail, or retry on every job. If you do not, the job stays reserved.
7. Retry and Dead Letters
Max Retries
The default max_retries is 3. When a job's attempt count reaches max_retries, the queue stops retrying it and moves it to the dead-letter store. Set the limit when you create the queue.
Inspecting Retrying Jobs
failed returns the jobs that have failed at least once but are still being retried (attempts above zero, under max_retries). These jobs live in the pending queue and will be picked up again automatically. failed returns an array of Hashes, so use string keys:
retrying = queue.failed
retrying.each do |job|
puts "Retrying job: #{job["id"]} (attempt #{job["attempts"]})"
puts " Last error: #{job["error"]}"
endDead Letters
Jobs that exhausted max_retries are dead letters. dead_letters returns an array of Hashes -- access fields with string keys:
dead_jobs = queue.dead_letters
dead_jobs.each do |job|
puts "Dead job: #{job["id"]}"
puts " Payload: #{job["payload"]}"
puts " Error: #{job["error"]}"
endReviving Dead Letters
retry_failed revives dead-letter jobs (those still under max_retries) back to the pending queue and returns the count re-queued. To revive one specific dead-letter job by id, use queue.retry(job_id):
# Revive every eligible dead letter
queue.retry_failed
# Revive one specific dead-letter job
queue.retry("abc-123")Purging Jobs
Remove jobs by status. "failed" and "dead" remove from the dead-letter store; "pending" removes matching jobs from the pending queue:
queue.purge("dead")
queue.purge("pending")8. Producing Multiple Jobs
Sometimes a single action triggers multiple background tasks:
Tina4::Router.post("/api/orders") do |request, response|
body = request.body
order_id = 101
queue = Tina4::Queue.new(topic: "emails")
queue.push({
order_id: order_id,
to: body["email"],
subject: "Order Confirmation"
})
queue.produce("invoices", {
order_id: order_id,
format: "pdf"
})
queue.produce("inventory", {
items: body["items"]
})
queue.produce("warehouse", {
order_id: order_id,
shipping_address: body["shipping_address"]
})
response.json({
message: "Order placed successfully",
order_id: order_id
}, 201)
endFour jobs are queued in under 5 milliseconds. The user gets an instant response.
9. Switching Backends
Switching backends is a config change, not a code change.
Default: File
# No config needed -- file is the default.
# Jobs are stored as JSON files under ./.queue in the working directory.To store jobs somewhere else, pass a dir: option to the backend rather than an environment variable:
backend = Tina4::QueueBackends::LiteBackend.new(dir: "./data/queue")
queue = Tina4::Queue.new(topic: "emails", backend: backend)RabbitMQ
TINA4_QUEUE_BACKEND=rabbitmq
TINA4_QUEUE_URL=amqp://user:pass@localhost:5672Kafka
TINA4_QUEUE_BACKEND=kafka
TINA4_QUEUE_URL=localhost:9092MongoDB
TINA4_QUEUE_BACKEND=mongodb
TINA4_QUEUE_URL=mongodb://user:pass@localhost:27017/tina4Install the MongoDB driver:
gem install mongoYour code does not change. The same queue.push and queue.consume calls work with every backend.
10. Separate Producer and Consumer Patterns
Use separate Tina4::Queue instances in different files or services for clarity:
# Producer side (e.g., in a route handler)
queue = Tina4::Queue.new(topic: "emails")
queue.push({ to: "alice@example.com", subject: "Hello" })
# Consumer side (e.g., in a worker script)
queue = Tina4::Queue.new(topic: "emails")
queue.consume("emails") do |job|
process(job)
job.complete
endThe same Tina4::Queue class handles both producing and consuming. Separate instances in different files make intent clearer.
11. Exercise: Build an Email Queue
Build a queue-based email system with failure handling.
Requirements
- Create these endpoints:
| Method | Path | Description |
|---|---|---|
POST | /api/emails/send | Queue an email for sending |
GET | /api/emails/queue | Show pending email count |
GET | /api/emails/dead | List dead letter jobs |
POST | /api/emails/retry | Revive dead-letter jobs |
The email payload should include:
to(required),subject(required),body(required)Create a consumer that processes the queue, simulating occasional failures
Test with:
curl -X POST http://localhost:7147/api/emails/send \
-H "Content-Type: application/json" \
-d '{"to": "alice@example.com", "subject": "Welcome!", "body": "Thanks for signing up."}'
curl http://localhost:7147/api/emails/queue
curl http://localhost:7147/api/emails/dead
curl -X POST http://localhost:7147/api/emails/retry12. Solution
Create src/routes/email_queue.rb:
queue = Tina4::Queue.new(topic: "emails")
# @noauth
Tina4::Router.post("/api/emails/send") do |request, response|
body = request.body
errors = []
errors << "'to' is required" if body["to"].nil? || body["to"].empty?
errors << "'subject' is required" if body["subject"].nil? || body["subject"].empty?
errors << "'body' is required" if body["body"].nil? || body["body"].empty?
unless errors.empty?
return response.json({ errors: errors }, 400)
end
job = queue.push({
to: body["to"],
subject: body["subject"],
body: body["body"]
})
response.json({
message: "Email queued for sending",
job_id: job.id
}, 201)
end
Tina4::Router.get("/api/emails/queue") do |request, response|
count = queue.size
response.json({ pending: count })
end
Tina4::Router.get("/api/emails/dead") do |request, response|
dead_jobs = queue.dead_letters
items = dead_jobs.map do |job|
{ id: job["id"], payload: job["payload"], error: job["error"] }
end
response.json({ dead_letters: items, count: items.length })
end
Tina4::Router.post("/api/emails/retry") do |request, response|
queue.retry_failed
response.json({ message: "Dead-letter emails re-queued" })
endCreate a separate consumer file src/workers/email_worker.rb:
queue = Tina4::Queue.new(topic: "emails", max_retries: 3)
queue.consume("emails") do |job|
payload = job.payload
puts "Sending email to #{payload[:to]}..."
puts " Subject: #{payload[:subject]}"
begin
# Simulate sending (replace with real email logic)
sleep(1)
# Simulate failure for a specific address
if payload[:to] == "bad@example.com"
raise "SMTP connection refused"
end
puts " Email sent to #{payload[:to]} successfully!"
job.complete
rescue => e
puts " Failed: #{e.message}"
job.fail(e.message)
end
endThe consumer retries a failing job to bad@example.com automatically. After max_retries attempts, queue.dead_letters returns that job, and the /api/emails/dead endpoint shows it. You investigate, fix the address, and call /api/emails/retry to revive it.
13. Gotchas
1. Always call complete or fail
Problem: Jobs stay in reserved status forever.
Cause: Your consumer block does not call job.complete or job.fail. The job stays reserved and is never released.
Fix: Always call one of job.complete, job.fail(reason), or job.reject(reason) in your consumer block.
2. Worker not picking up messages
Problem: Messages are pushed but nothing happens.
Cause: No consumer process is running, or the consumer is listening on a different topic.
Fix: Make sure the consumer is running. Check that the topic name in queue.push matches the topic in queue.consume.
3. Payload must be serializable
Problem: queue.push throws an error.
Cause: You passed a non-serializable object (database connection, file handle, etc.).
Fix: Only pass simple data types in the payload. If you need to reference a database record, pass the ID. The consumer looks up records by ID.
4. Dead letters pile up
Problem: Dead letters accumulate and nobody notices.
Cause: Jobs that exhaust max_retries become dead letters but are never cleaned up.
Fix: Monitor dead letters with queue.dead_letters. Set up an alert when the count exceeds a threshold. Investigate, fix, then call queue.retry_failed or queue.purge("dead").
5. File backend for production
Problem: Multiple workers cause contention on the file backend.
Cause: The file backend is designed for single-worker setups.
Fix: For production with multiple workers, switch to RabbitMQ, Kafka, or MongoDB via the TINA4_QUEUE_BACKEND environment variable.
6. dead_letters and failed return Hashes
Problem: job.id or job.payload raises NoMethodError when iterating dead_letters or failed.
Cause: dead_letters and failed return arrays of Hashes, not Job objects.
Fix: Use string keys: job["id"], job["payload"], job["error"], job["attempts"].