Skip to content

Filas BullMQ

O backend usa BullMQ com Redis para processamento assíncrono. Filas permitem que requests HTTP retornem imediatamente enquanto o trabalho pesado (envio de mensagens, sync do ERP) ocorre em background.

Topologia de filas

Redis
 ├── whatsapp-send         → envio individual de mensagens WhatsApp
 ├── email-send            → envio individual de emails
 ├── campaign-trigger      → gatilho para campanhas agendadas (uma vez)
 ├── erp-sync-queue        → sync de vendas do Millennium ERP
 ├── erp-customer-sync-queue → sync de clientes do Millennium ERP
 └── post-sale-survey      → envio de pesquisas pós-venda

BullBoard (UI de administração)

Acesse em produção:

https://apicrm.galdix.com.br/admin/queues

Protegido por Basic Auth (credenciais no .env.secrets). Permite visualizar jobs em todas as filas: waiting, active, completed, failed, delayed.


whatsapp-send

Processor: WhatsappSendProcessor
Arquivo: modules/integrations/whatsapp/whatsapp-send.processor.ts

Job data

typescript
interface WhatsappSendJobData {
  campaignId: string
  storeWhatsappId: string
  customerId: string
  phone: string
  templateName: string
  templateLanguage: string
  templateVars: string[]
  redirectToken?: string          // token para botão URL de redirect
  redirectButtonIndex?: number    // índice do botão URL
  unsubscribeToken?: string       // token para botão de opt-out
  unsubscribeButtonIndex?: number
  headerImageUrl?: string         // imagem do header do template
  organizationId?: string
}

Fluxo do processor

1. Busca credenciais frescas do DB (NUNCA armazena token no Redis)
2. Normaliza phone → adiciona 55 se faltando
3. Carrega campaignSettings da organização
4. Verifica rate limit de mensagens por cliente (messageLimitDays/Count)
5. Verifica conflito de campanhas (blockMultipleCampaigns)
6. Valida que nenhuma templateVar está vazia (previne erro Meta #131008)
7. Monta componentes da mensagem:
   - IMAGE header (se headerImageUrl)
   - BODY com vars resolvidas
   - BUTTON URL (se redirectToken) → substitui sentinela __REDIRECT_URL__
   - BUTTON URL (se unsubscribeToken)
8. Chama MetaGraphService.sendMessage()
9. Cria WhatsappMessage (status=SENT) + incrementa CampaignMetric.sent
10. accumulateCost() — fire-and-forget
11. syncToChatwoot() — fire-and-forget
12. checkCampaignCompletion() — se não há mais jobs, muda campaign.status

Configuração de job

typescript
await whatsappQueue.addBulk(jobs.map(job => ({
  name: 'send-whatsapp',
  data: job,
  opts: {
    attempts: 2,
    backoff: { type: 'exponential', delay: 1000 },
  }
})))

Tratamento de erro

  • Na última tentativa: cria WhatsappMessage com status=FAILED
  • Incrementa CampaignMetric.failed
  • Re-throw para o BullMQ gerenciar retry

email-send

Processor: EmailSendProcessor
Arquivo: modules/marketing/campaigns/services/email-send.processor.ts

Job data

typescript
interface EmailSendJobData {
  campaignId: string
  customerId: string
  recipientEmail: string
  subject: string
  htmlBody: string          // HTML final (já compilado do MJML, com tracking)
  organizationId: string
  emailMessageId: string
  senderName: string
  senderEmail: string
  listUnsubscribeHeader?: string       // RFC 8058
  listUnsubscribePostHeader?: string
}

Fluxo do processor

1. Busca configuração SMTP da organização
2. Decripta senha (AES-256-GCM)
3. Cria transporter Nodemailer:
   - connectionTimeout: 15.000ms
   - greetingTimeout:   15.000ms
   - socketTimeout:     30.000ms
4. Envia com headers List-Unsubscribe (se fornecidos)
5. Sucesso → EmailMessage.status=SENT, CampaignMetric.sent++, delivered++
6. Erro SMTP → detecta tipo de bounce:
   - HARD: códigos 550-559, "not found", "user unknown"
   - SOFT: códigos 450-459, "full", "quota"
7. Atualiza EmailMessage.status=BOUNCED com bounceType e bounceMessage
8. checkCampaignCompletion() → quando não há mais QUEUED, campaign.status=sent

Configuração de job

typescript
// Jobs criados com delay para respeitar throughput SMTP (~300/min):
{ delay: i * 200 }  // 200ms entre jobs = 5/seg = 300/min

Attempts: 3, backoff exponencial com delay 60s.


campaign-trigger

Processor: CampaignTriggerProcessor
Arquivo: modules/marketing/campaigns/services/campaign-trigger.processor.ts

Job data

typescript
interface CampaignTriggerJobData {
  campaignId: string
  organizationId: string
}

Propósito

Campanhas agendadas para o futuro recebem um job com delay calculado:

typescript
const delay = scheduledAt.getTime() - Date.now()

await campaignTriggerQueue.add(
  'trigger-campaign',
  { campaignId, organizationId },
  { delay, jobId: `campaign-${campaignId}` }
)

O jobId fixo evita duplicatas: se a campanha for editada, o job antigo é removido e um novo é criado.

Fluxo

1. Descobre canal da campanha (WHATSAPP ou EMAIL)
2. Chama sendWhatsappCampaign() ou sendEmailCampaign()
3. Loga início, conclusão e duração

erp-sync-queue

Processor: SyncProcessor (implícito via scheduler)
Arquivo: modules/erp/erp-sync/sync.service.ts

Jobs criados

typescript
// Sync noturno (6h diárias): dados do dia anterior
queueStoreSyncs(yesterday.start, yesterday.end, 'nightly')

// Sync horário: última hora
queueStoreSyncs(oneHourAgo, now, 'hourly')

Job data

typescript
{
  storeCode: string
  start: string    // ISO date
  end: string      // ISO date
  frequency: 'nightly' | 'hourly'
}

Staggering de jobs

Para evitar sobrecarga simultânea:

typescript
// Cada loja recebe um delay escalonado:
const delay = i * 2 * 60_000 + Math.random() * 30_000
// Loja 0: 0-30s, Loja 1: 2min-2min30s, Loja 2: 4min-4min30s...

jobId com idempotência

typescript
// Nightly: jobId inclui data → apenas 1 por dia por loja
jobId: `sync-${store.code}-nightly-${today}`

// Hourly: jobId inclui hora → apenas 1 por hora por loja
jobId: `sync-${store.code}-hourly-${today}-${hour}`

removeOnComplete: { age: 25 * 3600 } → limpa automaticamente após 25h, permitindo reuso do jobId no ciclo seguinte.


erp-customer-sync-queue

Arquivo: modules/erp/customer-sync/customer-sync.service.ts

typescript
// Job agendado às 5h diárias
jobId: `sync-customers-global-${today}`  // um por dia, idempotente
{
  attempts: 3,
  backoff: { type: 'exponential', delay: 60_000 },
}

Faz sync completo de clientes (trans_id=0) via streaming keyset pagination (1.000 clientes por batch).


Cron jobs (scheduler NestJS)

Além das filas, o sistema usa cron jobs para tarefas periódicas:

ServiçoCronAção
SyncService0 6 * * *Sync noturno de vendas (dia anterior)
SyncService0 * * * *Sync horário de vendas
CustomerSyncService0 5 * * *Sync completo de clientes
IntelligenceService0 3 * * *Snapshot diário de segmentos
CampaignScheduler*/5 * * * *Campanhas recorrentes (a cada 5min)
WhatsappPricingSyncService0 4 * * 1Sync de tarifas WhatsApp (segunda)
ReconciliationService0 */15 * * * *Reconciliação de status de mensagens

Padrões de uso

Enfileirar e esquecer

typescript
// Serviço enfileira job e retorna imediatamente:
await this.whatsappQueue.addBulk(jobs)  // jobs são processados em paralelo
return { queued: jobs.length }

Lock distribuído (campanhas recorrentes)

typescript
// Previne double-dispatch em reinicializações:
const locked = await redis.set(
  `campaign:lock:${campaignId}`,
  '1',
  'EX', 300,   // 5min TTL
  'NX'         // só seta se não existir
)
if (!locked) return  // outra instância está processando

Checkpoint (recuperação de falhas)

typescript
// Antes de side-effects, registra intenção:
const jobRun = await prisma.jobRun.create({ data: { campaignId, status: 'running' } })

// Executa ação
await sendWhatsappCampaign(campaignId)

// Registra conclusão
await prisma.jobRun.update({ where: { id: jobRun.id }, data: { status: 'completed' } })

onApplicationBootstrap() do CampaignScheduler varre JobRun com status=running e started > 10min para re-disparar jobs que travaram.


Monitoramento de filas

bash
# Via CLI Redis:
redis-cli -h localhost -p 6379

# Jobs esperando:
LLEN bull:whatsapp-send:wait

# Jobs ativos:
LLEN bull:whatsapp-send:active

# Jobs com falha:
LLEN bull:whatsapp-send:failed

# Via admin API (autenticado):
GET /admin/metrics
# Retorna: { queue: { waiting, active, failed, completed, delayed } }

Via BullBoard (UI): https://apicrm.galdix.com.br/admin/queues

Documentação interna — Galdix CRM