tech/cloudflare/queues

QUEUES

Cloudflare Queues — async message processing.

production Cloudflare Workers (Paid plan required)
improves: tech/cloudflare

Cloudflare Queues

Queues decouple producers (Workers that send messages) from consumers (Workers that process them). Messages are delivered at-least-once with automatic retries.

Pricing: $0.40 per million messages. Included in Workers Paid plan.

The code review anti-pattern: running sequential AI agent calls inside an HTTP handler risks Cloudflare's 30s CPU limit and gives users a slow experience. Queues fix this — return a 202 immediately, process asynchronously.

wrangler.toml

# Producer binding (Worker that sends messages)
[[queues.producers]]
binding = "QUEUE"
queue = "ai-jobs"

# Consumer binding (Worker that processes messages)
[[queues.consumers]]
queue = "ai-jobs"
max_batch_size = 10          # messages per batch
max_batch_timeout = 5        # seconds to wait to fill a batch
max_retries = 3              # retry failed messages N times
dead_letter_queue = "ai-jobs-dlq"  # failed messages go here
wrangler queues create ai-jobs
wrangler queues create ai-jobs-dlq   # dead letter queue

Producer: sending messages

// Send a single message
await env.QUEUE.send({ type: 'analyse_brief', projectId, userId });

// Send multiple messages (batch — cheaper, one operation)
await env.QUEUE.sendBatch([
  { body: { type: 'send_email', to: 'user@example.com', template: 'welcome' } },
  { body: { type: 'update_credits', userId, amount: 100 } },
  { body: { type: 'log_event', event: 'project_created', projectId } },
]);

Consumer: processing messages

export default {
  // HTTP handler — returns 202 immediately
  async fetch(request: Request, env: Env): Promise<Response> {
    const { projectId, userId } = await request.json();
    await env.QUEUE.send({ type: 'analyse_brief', projectId, userId });
    return Response.json({ status: 'queued' }, { status: 202 });
  },

  // Queue consumer — runs asynchronously
  async queue(batch: MessageBatch<JobMessage>, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      try {
        await processJob(msg.body, env);
        msg.ack();  // remove from queue on success
      } catch (err) {
        msg.retry({ delaySeconds: 30 });  // retry after 30s delay
      }
    }
  },
};

Typed messages

type JobMessage =
  | { type: 'analyse_brief'; projectId: string; userId: string }
  | { type: 'send_email'; to: string; template: string; data?: Record<string, string> }
  | { type: 'update_credits'; userId: string; amount: number };

async function processJob(msg: JobMessage, env: Env): Promise<void> {
  switch (msg.type) {
    case 'analyse_brief':
      return analyseBrief(msg.projectId, msg.userId, env);
    case 'send_email':
      return sendEmail(msg, env);
    case 'update_credits':
      return updateCredits(msg.userId, msg.amount, env);
  }
}

Retry and dead letter queue pattern

async queue(batch: MessageBatch<JobMessage>, env: Env): Promise<void> {
  for (const msg of batch.messages) {
    try {
      await processJob(msg.body, env);
      msg.ack();
    } catch (err) {
      // msg.retry() — message goes back to queue (up to max_retries)
      // After max_retries, goes to dead_letter_queue
      if (msg.attempts >= 3) {
        // Last attempt — log to D1 before it goes to DLQ
        await env.DB.prepare('INSERT INTO failed_jobs (id, payload, error) VALUES (?, ?, ?)')
          .bind(crypto.randomUUID(), JSON.stringify(msg.body), String(err))
          .run();
        msg.ack(); // ack so it doesn't retry (we logged it)
      } else {
        msg.retry({ delaySeconds: Math.pow(2, msg.attempts) * 10 }); // exponential backoff
      }
    }
  }
}

Webhook ingestion pattern (reliable)

// POST /webhook — receive and immediately queue
async fetch(request: Request, env: Env): Promise<Response> {
  // 1. Verify signature FIRST — never process unsigned webhooks
  const signature = request.headers.get('X-Webhook-Signature') ?? '';
  const body = await request.text();
  const valid = await verifyHmac(body, signature, env.WEBHOOK_SECRET);
  if (!valid) return new Response('Unauthorized', { status: 401 });

  // 2. Queue for processing — respond fast
  await env.QUEUE.send({ payload: JSON.parse(body), source: 'webhook' });
  return new Response(null, { status: 202 });
}

Common Gotchas

See Also