Tema
Filas BullMQ
O backend usa BullMQ com Redis para processamento assíncrono. Filas permitem que requests HTTP retornem imediatamente enquanto o trabalho pesado (envio de mensagens, sync do ERP) ocorre em background.
Topologia de filas
Redis
├── whatsapp-send → envio individual de mensagens WhatsApp
├── email-send → envio individual de emails
├── campaign-trigger → gatilho para campanhas agendadas (uma vez)
├── erp-sync-queue → sync de vendas do Millennium ERP
├── erp-customer-sync-queue → sync de clientes do Millennium ERP
└── post-sale-survey → envio de pesquisas pós-vendaBullBoard (UI de administração)
Acesse em produção:
https://apicrm.galdix.com.br/admin/queuesProtegido por Basic Auth (credenciais no .env.secrets). Permite visualizar jobs em todas as filas: waiting, active, completed, failed, delayed.
whatsapp-send
Processor: WhatsappSendProcessor
Arquivo: modules/integrations/whatsapp/whatsapp-send.processor.ts
Job data
typescript
interface WhatsappSendJobData {
campaignId: string
storeWhatsappId: string
customerId: string
phone: string
templateName: string
templateLanguage: string
templateVars: string[]
redirectToken?: string // token para botão URL de redirect
redirectButtonIndex?: number // índice do botão URL
unsubscribeToken?: string // token para botão de opt-out
unsubscribeButtonIndex?: number
headerImageUrl?: string // imagem do header do template
organizationId?: string
}Fluxo do processor
1. Busca credenciais frescas do DB (NUNCA armazena token no Redis)
2. Normaliza phone → adiciona 55 se faltando
3. Carrega campaignSettings da organização
4. Verifica rate limit de mensagens por cliente (messageLimitDays/Count)
5. Verifica conflito de campanhas (blockMultipleCampaigns)
6. Valida que nenhuma templateVar está vazia (previne erro Meta #131008)
7. Monta componentes da mensagem:
- IMAGE header (se headerImageUrl)
- BODY com vars resolvidas
- BUTTON URL (se redirectToken) → substitui sentinela __REDIRECT_URL__
- BUTTON URL (se unsubscribeToken)
8. Chama MetaGraphService.sendMessage()
9. Cria WhatsappMessage (status=SENT) + incrementa CampaignMetric.sent
10. accumulateCost() — fire-and-forget
11. syncToChatwoot() — fire-and-forget
12. checkCampaignCompletion() — se não há mais jobs, muda campaign.statusConfiguração de job
typescript
await whatsappQueue.addBulk(jobs.map(job => ({
name: 'send-whatsapp',
data: job,
opts: {
attempts: 2,
backoff: { type: 'exponential', delay: 1000 },
}
})))Tratamento de erro
- Na última tentativa: cria
WhatsappMessagecomstatus=FAILED - Incrementa
CampaignMetric.failed - Re-throw para o BullMQ gerenciar retry
email-send
Processor: EmailSendProcessor
Arquivo: modules/marketing/campaigns/services/email-send.processor.ts
Job data
typescript
interface EmailSendJobData {
campaignId: string
customerId: string
recipientEmail: string
subject: string
htmlBody: string // HTML final (já compilado do MJML, com tracking)
organizationId: string
emailMessageId: string
senderName: string
senderEmail: string
listUnsubscribeHeader?: string // RFC 8058
listUnsubscribePostHeader?: string
}Fluxo do processor
1. Busca configuração SMTP da organização
2. Decripta senha (AES-256-GCM)
3. Cria transporter Nodemailer:
- connectionTimeout: 15.000ms
- greetingTimeout: 15.000ms
- socketTimeout: 30.000ms
4. Envia com headers List-Unsubscribe (se fornecidos)
5. Sucesso → EmailMessage.status=SENT, CampaignMetric.sent++, delivered++
6. Erro SMTP → detecta tipo de bounce:
- HARD: códigos 550-559, "not found", "user unknown"
- SOFT: códigos 450-459, "full", "quota"
7. Atualiza EmailMessage.status=BOUNCED com bounceType e bounceMessage
8. checkCampaignCompletion() → quando não há mais QUEUED, campaign.status=sentConfiguração de job
typescript
// Jobs criados com delay para respeitar throughput SMTP (~300/min):
{ delay: i * 200 } // 200ms entre jobs = 5/seg = 300/minAttempts: 3, backoff exponencial com delay 60s.
campaign-trigger
Processor: CampaignTriggerProcessor
Arquivo: modules/marketing/campaigns/services/campaign-trigger.processor.ts
Job data
typescript
interface CampaignTriggerJobData {
campaignId: string
organizationId: string
}Propósito
Campanhas agendadas para o futuro recebem um job com delay calculado:
typescript
const delay = scheduledAt.getTime() - Date.now()
await campaignTriggerQueue.add(
'trigger-campaign',
{ campaignId, organizationId },
{ delay, jobId: `campaign-${campaignId}` }
)O jobId fixo evita duplicatas: se a campanha for editada, o job antigo é removido e um novo é criado.
Fluxo
1. Descobre canal da campanha (WHATSAPP ou EMAIL)
2. Chama sendWhatsappCampaign() ou sendEmailCampaign()
3. Loga início, conclusão e duraçãoerp-sync-queue
Processor: SyncProcessor (implícito via scheduler)
Arquivo: modules/erp/erp-sync/sync.service.ts
Jobs criados
typescript
// Sync noturno (6h diárias): dados do dia anterior
queueStoreSyncs(yesterday.start, yesterday.end, 'nightly')
// Sync horário: última hora
queueStoreSyncs(oneHourAgo, now, 'hourly')Job data
typescript
{
storeCode: string
start: string // ISO date
end: string // ISO date
frequency: 'nightly' | 'hourly'
}Staggering de jobs
Para evitar sobrecarga simultânea:
typescript
// Cada loja recebe um delay escalonado:
const delay = i * 2 * 60_000 + Math.random() * 30_000
// Loja 0: 0-30s, Loja 1: 2min-2min30s, Loja 2: 4min-4min30s...jobId com idempotência
typescript
// Nightly: jobId inclui data → apenas 1 por dia por loja
jobId: `sync-${store.code}-nightly-${today}`
// Hourly: jobId inclui hora → apenas 1 por hora por loja
jobId: `sync-${store.code}-hourly-${today}-${hour}`removeOnComplete: { age: 25 * 3600 } → limpa automaticamente após 25h, permitindo reuso do jobId no ciclo seguinte.
erp-customer-sync-queue
Arquivo: modules/erp/customer-sync/customer-sync.service.ts
typescript
// Job agendado às 5h diárias
jobId: `sync-customers-global-${today}` // um por dia, idempotente
{
attempts: 3,
backoff: { type: 'exponential', delay: 60_000 },
}Faz sync completo de clientes (trans_id=0) via streaming keyset pagination (1.000 clientes por batch).
Cron jobs (scheduler NestJS)
Além das filas, o sistema usa cron jobs para tarefas periódicas:
| Serviço | Cron | Ação |
|---|---|---|
SyncService | 0 6 * * * | Sync noturno de vendas (dia anterior) |
SyncService | 0 * * * * | Sync horário de vendas |
CustomerSyncService | 0 5 * * * | Sync completo de clientes |
IntelligenceService | 0 3 * * * | Snapshot diário de segmentos |
CampaignScheduler | */5 * * * * | Campanhas recorrentes (a cada 5min) |
WhatsappPricingSyncService | 0 4 * * 1 | Sync de tarifas WhatsApp (segunda) |
ReconciliationService | 0 */15 * * * * | Reconciliação de status de mensagens |
Padrões de uso
Enfileirar e esquecer
typescript
// Serviço enfileira job e retorna imediatamente:
await this.whatsappQueue.addBulk(jobs) // jobs são processados em paralelo
return { queued: jobs.length }Lock distribuído (campanhas recorrentes)
typescript
// Previne double-dispatch em reinicializações:
const locked = await redis.set(
`campaign:lock:${campaignId}`,
'1',
'EX', 300, // 5min TTL
'NX' // só seta se não existir
)
if (!locked) return // outra instância está processandoCheckpoint (recuperação de falhas)
typescript
// Antes de side-effects, registra intenção:
const jobRun = await prisma.jobRun.create({ data: { campaignId, status: 'running' } })
// Executa ação
await sendWhatsappCampaign(campaignId)
// Registra conclusão
await prisma.jobRun.update({ where: { id: jobRun.id }, data: { status: 'completed' } })onApplicationBootstrap() do CampaignScheduler varre JobRun com status=running e started > 10min para re-disparar jobs que travaram.
Monitoramento de filas
bash
# Via CLI Redis:
redis-cli -h localhost -p 6379
# Jobs esperando:
LLEN bull:whatsapp-send:wait
# Jobs ativos:
LLEN bull:whatsapp-send:active
# Jobs com falha:
LLEN bull:whatsapp-send:failed
# Via admin API (autenticado):
GET /admin/metrics
# Retorna: { queue: { waiting, active, failed, completed, delayed } }Via BullBoard (UI): https://apicrm.galdix.com.br/admin/queues