Skip to content

Módulo ERP / Sincronização

O módulo ERP é a camada de integração entre o ERP Millennium e o banco de dados CRM. Ele é responsável por toda a sincronização de dados (vendas, clientes, vendedores, lojas e produtos), pela normalização e deduplicação de cadastros, pela atribuição de transações a campanhas de marketing e pelo fornecimento de analytics de vendas. A sincronização acontece via filas BullMQ com agendamentos cron independentes para cada tipo de dado.


Arquitetura de filas

O módulo usa duas filas BullMQ:

FilaPropósito
erp-sync-queueJobs de sincronização de vendas por loja (disparados por cron horário/noturno e manual)
erp-customer-sync-queueJobs de sincronização global de clientes (disparado por cron diário às 5h)

Services

SyncService

Arquivo: backend/src/modules/erp/sync/sync.service.ts

Orquestra a sincronização de transações de venda do ERP Millennium para o CRM. Responsável por dois crons (noturno e horário), enfileiramento de jobs por loja, processamento de notas fiscais (upsert de clientes, resolução de vendedores, deduplicação de transações) e atribuição de transações a campanhas de WhatsApp.

Eventos permitidos do ERP

S - VENDA E-COMMERCE
S - VENDA VAREJO NFC-E (LOJAS)
S - VENDA VAREJO CFE (LOJAS)
S - VENDA VAREJO CFE ( ESTOQUE NEGAT
S - VENDA VAREJO NFE
E - DEVOLUCAO DE VENDA VAREJO
E - DEVOLUCAO DE VENDA E-COMMERCE

Mapa de redirecionamento de lojas

Algumas filiais no ERP usam códigos antigos que devem ser remapeados para os códigos corretos:

Código ERPRedireciona para
006007
012028
019029

Métodos

handleNightlySync(): Promise<void> (CRON)

Schedule: 0 6 * * * — executa às 6h AM todos os dias.

Sincroniza as vendas do dia anterior de todas as lojas ativas.

Flow:

  1. Verifica DISABLE_SYNC_CRON === 'true' e retorna sem fazer nada se verdadeiro.
  2. Calcula yesterday = subDays(new Date(), 1).
  3. Define start = startOfDay(yesterday) e end = endOfDay(yesterday) como strings ISO.
  4. Chama queueStoreSyncs(start, end, 'nightly').

handleHourlySync(): Promise<void> (CRON)

Schedule: 0 * * * * — executa a cada hora cheia.

Sincroniza as vendas da última hora de todas as lojas ativas.

Flow:

  1. Verifica DISABLE_SYNC_CRON === 'true' e retorna sem fazer nada se verdadeiro.
  2. Define start = subHours(now, 1) e end = now.
  3. Chama queueStoreSyncs(start, end, 'hourly').

queueStoreSyncs(start, end, frequency): Promise<void> (privado)

Enfileira um job de sincronização para cada loja ativa, com jitter para evitar picos de carga.

Flow:

  1. Busca todas as lojas ativas (store.findMany where isActive: true).
  2. Para cada loja i: a. Calcula delay = i * 120_000 ms (2 minutos) + random(0..30_000 ms) — escalonamento com jitter. b. Gera jobId = sync-sales-{store.code}-{frequency}-{timeKey}, onde timeKey é YYYY-MM-DD para noturno ou YYYY-MM-DDTHH para horário (garante idempotência — o mesmo job não é adicionado duas vezes no mesmo ciclo). c. Adiciona job 'sync-sales' na fila erp-sync-queue com: attempts do CRM_CONFIG, backoff: exponential 5 min, removeOnComplete: 25h, removeOnFail: 7 dias.

Tabelas lidas: Store


syncSales(start: Date, end: Date): Promise<object>

Processa todas as notas fiscais do ERP no período. Chamado pelo processador de fila (não diretamente pelo cron).

Flow:

  1. Chama milleniumService.getFaturamentos(start, end) para buscar notas do ERP.
  2. Para cada nota (sale), chama processInvoice(sale). Em caso de erro por nota, loga e continua.
  3. Retorna { status, processed, new, updated }.

Tabelas lidas/escritas: via processInvoice


processInvoice(sale: any): Promise<boolean> (privado)

Processa uma nota fiscal individual do ERP. Retorna true se nova transação, false se atualização.

Flow:

  1. Determina o código da loja (cod_emissor || cod_filial) e aplica STORE_REDIRECT_MAP.
  2. Busca a loja no banco pelo código. Se não encontrada, pula a nota (skips non-configured stores).
  3. Resolve dados do cliente embutidos na nota (sale.cliente[0]). Pula notas sem cod_cliente.
  4. Chama resolveAddress(customerRaw) para selecionar o melhor endereço disponível.
  5. Chama extractPhone(customerRaw, allAddresses) para extrair telefone com DDD.
  6. Chama buildCustomerData(...) para construir o payload de upsert do cliente.
  7. Chama upsertCustomer(externalId, email, cpfCnpj, customerData) para criar/atualizar o cliente.
  8. Chama enrichSaleItems(sale) para normalizar produtos e calcular itemCount (Peças por Atendimento).
  9. Chama computeTransactionStatusAndValue(sale) para determinar status (PAID, CANCELLED, REFUNDED) e valor.
  10. Chama parseTransactionDate(sale) para converter o campo de data do ERP.
  11. Chama computeExternalTxId(sale) para gerar a chave de deduplicação.
  12. Chama resolveSeller(sale, organizationId) para obter o ID do vendedor (upsert on-the-fly).
  13. Verifica se a transação já existe (transaction.findUnique where externalId).
  14. Executa transaction.upsert com todos os dados. O campo channel não é sobrescrito em updates — preserva atribuição existente de WhatsApp.
  15. Se transação nova, chama scheduleFollowUps(...) de forma fire-and-forget.
  16. Retorna !existing.

Tabelas lidas: Store, Customer, TransactionTabelas escritas: Customer, Transaction, Seller


scheduleFollowUps(txId, status, customer, storeId, organizationId, storeDisplayName, vendorName): void (privado)

Dispara operações não-bloqueantes após criação de nova transação.

Operações (fire-and-forget, falhas apenas logadas):

  1. attributeTransaction(txId) — tenta atribuir a transação a uma campanha de WhatsApp.
  2. Se status === 'PAID' e PostSaleSurveyService disponível: postSaleSurvey.scheduleForTransaction(...) — agenda pesquisa pós-venda.

attributeTransaction(transactionId): Promise<void> (privado)

Atribui uma transação a uma campanha de WhatsApp se houver mensagem recente dentro da janela de atribuição.

Flow:

  1. Busca a transação pelo ID (transaction.findUnique, select: id, customerId, date, totalValue, campaignId).
  2. Retorna se já tiver campaignId, se não tiver customerId, ou se não existir.
  3. Busca o telefone do cliente (customer.findUnique). Retorna se sem telefone ou com menos de 10 dígitos.
  4. Busca a mensagem de WhatsApp enviada mais recente para o telefone (últimos 10 dígitos), direção OUTBOUND, com campaignId: { not: null }, status != FAILED.
  5. Busca a campanha para obter attributionWindowDays.
  6. Calcula diffMs = tx.date - message.sentAt. Retorna se fora da janela ou se a mensagem foi enviada depois da transação.
  7. Atualiza a transação: { campaignId, isInfluenced: true, channel: 'WHATSAPP' }.
  8. Faz upsert em CampaignMetric: incrementa conversions (só se totalValue > 0) e revenue.

Tabelas lidas: Transaction, Customer, WhatsappMessage, CampaignTabelas escritas: Transaction, CampaignMetric


upsertCustomer(externalId, email, cpfCnpj, customerData): Promise<Customer> (privado)

Localiza ou cria um cliente com estratégia de deduplicação em cascata.

Estratégia de resolução:

CasoAção
Encontrado por externalIdAtualiza o registro existente. Preserva o telefone se já tiver ≥ 10 dígitos (sync de vendas não é fonte autoritativa de contato).
Não encontrado por externalId, mas CPF hash bateFaz merge: atualiza o registro existente com o novo externalId (link) e mescla telefone (preserva existente se válido).
Não encontrado por nenhuma chaveCria novo registro.

resolveSeller(sale, organizationId): Promise<{ sellerId, vendorCode, vendorName }> (privado)

Resolve o vendedor a partir dos dados da nota. Faz upsert on-the-fly para vendedores novos.

  • Retorna { sellerId: null, vendorCode: null, vendorName: null } quando cod_vendedor é ausente, vazio ou zero.
  • Faz seller.upsert where externalId: vendorCode — cria com status: 'ACTIVE' se novo.

enrichSaleItems(sale): { enrichedItems, itemCount } (privado)

Normaliza itens de produto da nota ERP para o formato interno.

  • sale.produtos || sale.produto (ambos os nomes de campo do ERP são suportados).
  • Mapeia cada item para { id, name, quantity, price, total } usando campos ERP: cod_produto, desc_produto || descricao, quantidade, preco ?? preco_venda, valor_total_liquido ?? total_item.
  • Calcula itemCount = SUM(quantity) para PA (Peças por Atendimento).

computeTransactionStatusAndValue(sale): { status, totalValue } (privado)

Determina status e valor líquido da transação.

Condição ERPStatusValor
cancelado/cancelada === true/'T'/'True'CANCELLEDvalor original
tipo_operacao === 'E' (entrada/devolução)REFUNDED-Math.abs(valor)
demaisPAIDvalor_final ?? total

computeExternalTxId(sale): string (privado)

Gera chave única de deduplicação para transações.

Formato: {romaneio || cod_pedidov || pedidov || trans_id}{sufixo}

Sufixo: -S para vendas, -E para devoluções. Evita colisão entre venda e sua devolução (que têm o mesmo romaneio no ERP, mas são registros distintos no CRM).

Motivo: trans_id é granular demais — Millennium gera um trans_id por forma de pagamento/parcela, gerando N registros idênticos por venda.


extractPhone(raw, addresses): string | null (privado)

Extrai o melhor telefone disponível do payload do ERP, percorrendo todas as fontes com ordem de prioridade.

Prioridade:

  1. Top-level com DDD explícito: ddd_celular + cel
  2. Top-level com DDD explícito: ddd + fone
  3. Endereços (percorre todos): ddd_cel + cel
  4. Endereços (percorre todos): ddd + fone
  5. Top-level sem campo DDD, mas número ≥ 10 dígitos (DDD embutido): cel, fone
  6. Endereços sem campo DDD, mas número ≥ 10 dígitos: cel, fone

Regra de concatenação: Se o número já começa pelo DDD e tem ≥ 10 dígitos, não duplica o DDD.


parseTransactionDate(sale): Date (privado)

Converte campos de data do ERP para Date UTC.

  • Formato OData v3 /Date(ticks±offset)/ — usa apenas os ticks (milliseconds Unix), ignora o offset de fuso horário (o banco armazena em UTC).
  • String simples — strings do Millennium são em horário de Brasília (sem info de fuso). Adiciona 3h para converter BRT → UTC.
  • Fallback: new Date() (data atual).

parseBirthDate(birthDateStr): Date | null (privado)

Converte data de aniversário do ERP. Suporta formato OData e ISO plain. Retorna null para strings inválidas.


resolveAddress(customerRaw): { allAddresses, addressData } (privado)

Consolida os campos de endereço do payload ERP (enderecos, endereco, endereco_entrega) e seleciona o melhor: primeiro que tenha logradouro, cidade ou bairro; caso contrário o primeiro da lista; caso contrário {}.


buildCustomerData(...): any (privado)

Constrói o payload completo para upsert de cliente a partir dos dados brutos do ERP:

Campo CRMOrigem ERP
namecustomerRaw.nome.trim()
emailcustomerRaw.e_mail
phoneextractPhone()
phoneTypeclassifyPhone(phone)
city, stateendereço resolvido
addresslogradouro + numero
neighborhood, zipCodecampos de endereço
cpfHashhashCpf(cpfCnpj) — SHA-256
cpfEncryptedencryption.encrypt(normalizeCpf(cpfCnpj)) — criptografia reversível
cpfMaskedmaskCpfDigits(cpfCnpj) — ex: ***.***.*XX-**
cpfValidisValidCPF(cpfCnpj)
birthDateparseBirthDate(data_aniversario)
transIdBigInt(trans_id)

CustomerSyncService

Arquivo: backend/src/modules/erp/sync/customer-sync.service.ts

Sincroniza o cadastro completo de clientes do ERP Millennium para o CRM. Opera via streaming paginado (keyset pagination) para lidar com bases grandes sem carregar tudo em memória. Usa transações em batch com fallback sequencial para isolamento de falhas.


Métodos

handleDailySync(): Promise<void> (CRON)

Schedule: 0 5 * * * — executa às 5h AM todos os dias (após seller sync às 3h, antes do sync de vendas às 6h).

Flow:

  1. Verifica DISABLE_SYNC_CRON === 'true'.
  2. Busca a primeira organização do banco.
  3. Gera jobId = sync-customers-global-{YYYY-MM-DD} para idempotência diária.
  4. Adiciona job 'sync-customers' na fila erp-customer-sync-queue.
    • attempts do CRM_CONFIG
    • backoff: exponential 1 min
    • removeOnComplete: 24h, count: 100
    • removeOnFail: 7 dias, count: 500

Tabelas lidas: Organization


syncCustomers(): Promise<void>

Executa a sincronização completa de todos os clientes. Chamado pelo processador de fila.

Flow:

  1. Busca a primeira loja ativa (ordem por code ASC) para obter storeId e organizationId padrão.
  2. Chama milleniumService.streamCustomersKeyset(0, callback) — streaming keyset pagination a partir de trans_id=0 (full sync).
  3. O callback processa cada batch recebido em sub-chunks de 50 registros:
    • Chama processBatch(chunk, defaultStoreId, defaultOrgId).
    • Incrementa updatedCount.
    • Loga progresso a cada 5.000 clientes.
  4. Ao final, loga Customer sync complete. Total: {updatedCount}.

Tabelas lidas: Store


processBatch(batch, defaultStoreId, defaultOrgId): Promise<void> (privado)

Processa um batch de registros brutos do ERP, fazendo upsert de clientes com deduplicação.

Flow:

  1. Deduplicação de batch: Remove duplicatas por cod_cliente dentro do mesmo batch (o ERP pode enviar o mesmo cliente mais de uma vez). Loga se houve redução.
  2. Extração de chaves:
    • externalIds — array de String(cod_cliente) do batch.
    • cpfHashes — array de hashCpf(cpf || cnpj) do batch (filtra nulos).
  3. Bulk lookup paralelo:
    • customer.findMany where externalId: { in: externalIds } → Map externalId → customer.
    • customer.findMany where cpfHash: { in: cpfHashes } → Map cpfHash → customer.
  4. Processamento em chunks de 8 (abaixo do pool padrão do Prisma de 9 conexões):
    • Para cada cliente bruto, chama getQuery(raw) que monta a query de upsert.
    • Executa todas as queries do chunk como prisma.$transaction([...queries]).
    • Se a transação falhar: fallback sequencial — executa cada query individualmente e loga erros por cliente.

Lógica de resolução por cliente (dentro de getQuery):

CasoAção
CPF hash encontradoCASE 1: customer.update where id = cpfLookup.id — atualiza todos os campos, inclusive sincronizando externalId se mudou no ERP
Sem CPF, externalId encontradoCASE 2: customer.update where id = externalIdLookup.id — atualiza todos os campos
Nenhuma chave encontradaCASE 3: customer.create — novo cliente

Normalização de dados por cliente:

  • email — convertido para lowercase
  • phoneextractPhone(raw, allAddresses) com prioridade (DDD explícito > DDD embutido > bare number)
  • cpfHash — SHA-256 do CPF/CNPJ normalizado
  • cpfEncrypted — criptografia reversível para consulta interna
  • cpfMasked — display mascarado
  • birthDate — OData /Date(ticks)/ ou string ISO; null e valores inválidos logados com warning
  • createdAt — mapeado de data_cadastro (data real de cadastro no ERP); fallback new Date()
  • transIdBigInt(trans_id) com tratamento de erro para trans_id inválidos
  • personType'PJ' se pf_pj === 'PJ', senão 'PF'
  • dataQualityIssues — objeto com flags: phoneIncomplete: true (telefone < 10 dígitos), invalidBirthDate: true (ano < 1900 ou > anoAtual - 14)

Tabelas lidas: CustomerTabelas escritas: Customer


extractPhone(raw, addresses): string | null (privado)

Mesma lógica de 5 níveis de prioridade descrita no SyncService. Adicionalmente:

  • Nível 5 (último recurso): se o número tem menos de 10 dígitos, salva mesmo assim mas emite logger.warn indicando ausência de DDD.

CustomersService

Arquivo: backend/src/modules/erp/customers/customers.service.ts

Gerencia a listagem, visualização e opt-out de clientes no CRM. Usa tenantContext (AsyncLocalStorage) para filtrar automaticamente pela organização do usuário autenticado. A listagem usa uma estratégia de paginação otimizada: quando o sort é por campo da tabela Customer, pagina primeiro na tabela de clientes e depois faz JOIN apenas na página de resultados (evita JOIN pesado em toda a base).

Conversão de score RFM

A função rfmLabelToScore(label) converte labels RFM (tanto legados quanto os 10 segmentos atuais) para score numérico de 0 a 100:

LabelScore
Champions100
Loyal Customers85
Potential Loyalist75
Promising65
Recent Customers60
Need Attention45
About To Sleep35
At Risk25
Can't Lose Them20
Hibernating5

Métodos

findAll(params): Promise<{ data, pagination }>

Lista clientes com suporte a busca textual, filtros por RFM, segmento, loja e status, e ordenação.

Parâmetros:

  • page, limit — paginação (padrão: página 1, 10 por página)
  • search — busca por nome, e-mail, externalId ou CPF (hash automático se 11 ou 14 dígitos)
  • sortByname | createdAt | rfmScore | lastPurchase | totalTransactions | ltv
  • sortDirasc | desc
  • rfmSegments — array de labels RFM para filtrar
  • segmentId — ID de um segmento salvo (resolvido dinamicamente)
  • storeId — ID de loja específica
  • statusFilter — array de lead | inactive | warning | active

Resolução de segmento: Quando segmentId é fornecido, carrega as regras do segmento e executa queryBuilder.buildAsync() para obter IDs de clientes correspondentes (máx. 100.000). Esses IDs são passados como filtro c.id = ANY(...).

Estratégia de query — Paginate-First (otimização):

Ativada quando: sortBy é um dos campos da tabela Customer (name, createdAt, rfmScore, lastPurchase) E não há statusFilter.

sql
-- Paginate-first:
WITH TotalCount AS (SELECT COUNT(*) FROM "Customer" c WHERE ...),
PaginatedIds AS (SELECT c.id FROM "Customer" c WHERE ... ORDER BY {campo} LIMIT ? OFFSET ?),
CustomerStats AS (
  SELECT c.id, c.name, ..., SUM(t."totalValue") AS ltv, COUNT(t) AS "totalTransactions",
         MAX(t.date) AS "lastPurchaseDate", EXTRACT(EPOCH ...) AS "daysSinceLastBuy"
  FROM PaginatedIds pi JOIN "Customer" c ON c.id = pi.id LEFT JOIN "Transaction" t ON c.id = t."customerId"
  GROUP BY c.id, ...
)
SELECT * FROM CustomerStats ORDER BY {campo} {dir} NULLS LAST

Estratégia de query — Full (com status filter):

sql
WITH CustomerStats AS (
  SELECT c.id, ..., SUM(t."totalValue") AS ltv, COUNT(t) AS "totalTransactions", MAX(t.date), ...
  FROM "Customer" c LEFT JOIN "Transaction" t ON c.id = t."customerId"
  WHERE c."organizationId" = $1 AND (c.name ILIKE $2 OR ...) {segmentClause} {storeClause}
  GROUP BY c.id, c."rfmStatus"
),
FilteredCustomers AS (
  SELECT *, COUNT(*) OVER() AS "fullCount"
  FROM CustomerStats
  WHERE ($4::boolean = false OR "rfmStatus" = ANY($5::text[]))
  {statusClause}
)
SELECT * FROM FilteredCustomers ORDER BY ... LIMIT ? OFFSET ?

Classificação de status (em memória):

StatusCritério
leaddaysSinceLastBuy IS NULL OR totalTransactions = 0
inactivedaysSinceLastBuy > 120 AND totalTransactions > 0
warningdaysSinceLastBuy > 60 AND <= 120 AND totalTransactions > 0
activedaysSinceLastBuy IS NOT NULL AND <= 60 AND totalTransactions > 0

Tabelas lidas: Customer, Transaction, Segment


findById(id: string): Promise<object>

Retorna perfil completo de um cliente com histórico de transações.

Flow:

  1. Busca cliente com customer.findUnique where id, incluindo todas as transactions (com store e salesperson), ordenadas por date desc.
  2. Lança NotFoundException se não encontrado.
  3. Busca em paralelo: preferredStore (se preferredStoreId presente) e preferredSeller (se preferredSellerId presente, campo adicionado via migration raw).
  4. Calcula métricas em memória:
    • ltv — soma de totalValue de todas as transações (inclui devoluções com valor negativo para LTV líquido correto).
    • totalTransactions — conta apenas transações não devolvidas com totalValue >= 0.
    • lastPurchaseDate — data da transação mais recente (excluindo devoluções).
    • daysSinceLastBuydifferenceInDays(today, lastPurchaseDate).
    • agefloor(differenceInDays(today, birthDate) / 365) quando disponível.
  5. Monta campo history com tipo 'purchase' ou 'return' baseado em totalValue < 0 || status === 'REFUNDED'.

Tabelas lidas: Customer, Transaction, Store, Seller


previewImportOptout(dto: ImportOptoutDto): Promise<object>

Verifica quais clientes de uma lista de CPFs seriam afetados pelo opt-out, sem aplicar.

Flow:

  1. Normaliza CPFs (remove pontos e traços) e gera hashes SHA-256.
  2. Busca clientes na organização com cpfHash: { in: cpfHashes }.
  3. Retorna { total, found, notFoundCount, notFound, customers } com nome e CPF mascarado de cada encontrado.

Tabelas lidas: Customer


applyImportOptout(dto: ImportOptoutDto): Promise<{ updated, total }>

Aplica opt-out em massa por canal para uma lista de CPFs.

Parâmetros do DTO:

  • cpfs — array de strings de CPF (com ou sem formatação)
  • channel'whatsapp' | 'email' | 'sms'

Flow:

  1. Normaliza CPFs e gera hashes.
  2. Executa UPDATE "Customer" SET {channel}OptOut = true, {channel}OptOutAt = {now} WHERE organizationId = ? AND cpfHash = ANY(?) AND {channel}OptOut = false.
  3. Retorna { updated: rows_affected, total: cpfs_count }.

Tabelas escritas: Customer


updatePreferences(id, dto): Promise<object>

Atualiza preferências de opt-out de um cliente individual.

Parâmetros do DTO (todos opcionais):

  • whatsappOptOut — boolean
  • emailOptOut — boolean
  • smsOptOut — boolean

Flow:

  1. Valida que o cliente existe na organização do contexto.
  2. Para cada campo presente no DTO: define o campo de opt-out e a respectiva data (whatsappOptOutAt, emailOptOutAt, smsOptOutAt) — null quando opt-out é false.
  3. Atualiza e retorna os campos de opt-out atualizados.

Tabelas lidas: CustomerTabelas escritas: Customer


SalesService

Arquivo: backend/src/modules/erp/sales/sales.service.ts

Fornece todos os analytics de vendas: dashboard do dia em tempo real, comparação de lojas, histórico diário/mensal, métricas de varejo com comparativo de período anterior e YoY, dashboard de canais/campanhas, ranking de vendedores e lista de clientes influenciados. Opera em fuso de Brasília (UTC-3) com datas UTC armazenadas no banco.

Tratamento de lojas filiais

getFilialCustomerIds() — cacheia IDs de clientes cujo CPF hash corresponde ao CNPJ de uma loja filial (lojas com primaryStoreId: not null). Esses clientes representam transferências inter-filiais e são excluídos de métricas de CRM. O cache é de processo (válido enquanto o servidor está rodando, atualizado apenas no restart).

Convenção de datas

Todas as datas de transação são armazenadas no banco como UTC sem informação de fuso. Para cálculos diários e horários em BRT, o serviço usa:

  • brtDayBoundsUtc() — retorna { start, end } do dia atual em BRT como UTC.
  • nowBRT() — data/hora atual em BRT.
  • Queries SQL usam - INTERVAL '3 hours' para agrupar por dia/hora BRT.

Métodos

processSale(data: CreateSaleDto): Promise<object>

Registra uma transação manualmente (usado por webhooks internos, não pelo sync do ERP).

Flow:

  1. Busca a loja pelo data.storeId para obter organizationId.
  2. Tenta encontrar cliente por e-mail (customer.findFirst where email). Se existir, atualiza o nome.
  3. Se não existir, cria novo cliente com email, nome e CPF.
  4. Cria Transaction com status: 'PAID'.

Tabelas lidas: Store, CustomerTabelas escritas: Customer, Transaction


getDailyTotal(): Promise<object>

Retorna KPIs de vendas do dia atual (BRT) em tempo real.

Queries executadas em paralelo:

  1. Agregação do dia atual: SUM(totalValue), COUNT(id) — transações não canceladas.
  2. Agregação de ontem no mesmo intervalo de horas: SUM(totalValue) — para comparativo.
  3. Agregação do mês atual: SUM(totalValue), COUNT(id).
  4. Receita influenciada do mês (com exclusão de filiais).
  5. Taxa de recompra do mês: query SQL com subquery correlacionada que conta clientes únicos do mês e verifica se tinham compra anterior a monthStart.
  6. Contagem das últimas 4h (PAID apenas).
  7. Última transação do dia (para indicar frescor do sync).

Retorno: { total, count, yesterdayTotal, monthTotal, monthCount, influencedRevenue, repurchaseRate, recentCount, lastSyncAt }

Tabelas lidas: Transaction


getSalesHistory(): Promise<object[]>

Retorna comparativo dia-a-dia entre mês atual e mês anterior.

Flow:

  1. Calcula limites de mês atual (dia 1 até hoje) e mês anterior (completo).
  2. Executa duas queries SQL em paralelo: uma por período, agrupando por EXTRACT(DAY FROM date - INTERVAL '3 hours').
  3. Constrói array com uma entrada por dia do mês atual, com mesAtual e mesAnterior.

Tabelas lidas: Transaction


getRecentSales(): Promise<object[]>

Retorna as 15 transações PAID mais recentes do dia atual (BRT), com dados do cliente e loja.

Tabelas lidas: Transaction, Customer, Store


getStoreComparison(): Promise<object[]>

Retorna comparativo de vendas por loja: hoje vs ontem no mesmo intervalo de horas, e acumulado do ano.

Flow:

  1. Em paralelo: agrupa transações por storeId para hoje, ontem-mesmo-horário, última venda e YTD.
  2. Busca lojas primárias (excluindo aliases e lojas DESAT).
  3. Constrói mapa storeId → primaryStoreId (incluindo aliases) para consolidar dados.
  4. Para cada loja primária calcula variação percentual (hoje - ontem) / ontem * 100.
  5. Filtra lojas sem nenhuma venda no ano.
  6. Ordena por today DESC.

Tabelas lidas: Transaction, Store


getIntradayHistory(): Promise<object[]>

Retorna distribuição horária de vendas: hoje vs ontem, hora a hora em BRT.

Flow:

  1. Executa duas queries SQL: uma para hoje e uma para ontem, agrupando por EXTRACT(HOUR FROM date - INTERVAL '3 hours').
  2. Determina dinamicamente o range de horas com vendas em qualquer dos dois dias.
  3. Sempre inclui até a hora atual BRT no range.
  4. Retorna array com flag isFuture: true para horas ainda não ocorridas hoje.

Tabelas lidas: Transaction


triggerManualSync(): Promise<{ queued, start, end }>

Dispara sync manual de vendas do dia atual para todas as lojas ativas (excluindo lojas alias 006, 012, 019).

Tabelas lidas: Store


getTopSellers(): Promise<object[]>

Retorna os 10 vendedores com maior receita no mês atual.

Flow:

  1. Agrupa transações PAID do mês atual por salespersonId, soma totalValue e conta pedidos, ordena por soma DESC, limita a 10.
  2. Busca dados dos vendedores incluindo loja.
  3. Retorna com rank de 1 a 10.

Tabelas lidas: Transaction, Seller


getSellerRfm(params): Promise<object[]>

Aplica análise RFM aos vendedores: Recência, Frequência e Monetário calculados como percentis.

Parâmetros: start, end (padrão: últimos 12 meses), storeId (opcional).

Flow:

  1. Busca todos os vendedores ativos da organização (filtrados por loja se fornecido).
  2. Agrega transações por salespersonId no período: SUM(totalValue), COUNT(id), MAX(date).
  3. Para vendedores com pelo menos 1 venda, calcula percentis de receita, frequência e recência usando arrays sorted + rank/length.
  4. Transforma percentil (0–1) em score 1–5.
  5. Classifica em labels:
LabelCondição
CampeãorScore ≥ 4, fScore ≥ 4, mScore ≥ 4
ConsistentefScore ≥ 4, mScore ≥ 3
PromissorrScore ≥ 4, fScore ≥ 3
Em QuedamScore ≥ 4, rScore ≤ 2
Em DesenvolvimentorScore ≥ 3, fScore ≥ 2
InativorScore ≤ 2, mScore ≤ 2
Regulardemais casos
  1. Calcula revenueShare de cada vendedor em relação ao total.

Tabelas lidas: Seller, Transaction


getRetailMetrics(start, end, storeIds?): Promise<object>

Dashboard completo de varejo: KPIs, histórico, canais e performance por loja.

Flow:

  1. Em paralelo: expande storeIds via aliases e obtém IDs de clientes filiais.
  2. Em paralelo: busca transações do período (com select de campos necessários), conta transações históricas antes do período para cada cliente ativo no período (subquery para determinar status novo/recorrente), e busca lista de lojas.
  3. Chama aggregateTransactions() para processar em memória:
    • Agrupa por dia (períodos ≤ 60 dias) ou mês (períodos > 60 dias).
    • Classifica cada cliente como new (sem compra prévia), recurrent (orgânico com compra prévia) ou recovered (influenciado com compra prévia).
    • A classificação é determinada na primeira compra do cliente no período e usada em todas as compras subsequentes do mesmo cliente.
    • Calcula itemsPerTicket (PA) usando itemCount do ERP.
  4. Chama fetchPrevYearData() para buscar dados do mesmo período no ano anterior (com shift de +1 ano nos labels para alinhar com o período atual).
  5. Chama computePrevPeriodKpis() para calcular métricas do período imediatamente anterior (mesmo comprimento do período atual).
  6. Formata histórico com formatHistoryEntries(), canais com formatChannels(), lojas com formatStores().
  7. Calcula variações percentuais KPI vs período anterior.

Retorno: { kpis: { revenue, transactions, ticketAverage, repurchaseRate, *Variation }, history, channels, stores, granularity }

Tabelas lidas: Transaction, Store


getChannelDashboard(startDate, endDate, filters?): Promise<object>

Dashboard de campanhas por canal com receita influenciada, conversões e métricas de envio.

Flow:

  1. Busca campanhas do período com métricas.
  2. Para campanhas WhatsApp: busca contagem de cliques em CampaignRedirectToken (cliques em botões de redirecionamento).
  3. Chama normalizeCampaignMetrics() para padronizar campos por canal:
    • WhatsApp: sent/delivered/opens/clicks vêm de CampaignMetric (não de Campaign).
    • E-mail: usa CampaignMetric com fallback para campos da Campaign.
  4. Agrega transações por dia e por loja em duas queries SQL paralelas usando SUM/COUNT com flag isInfluenced.
  5. Constrói chartData (por dia), dispatchesList (por disparos de campanha), storesList (por loja).
  6. Atribuição de receita por campanha: se a campanha tem métricas diretas (WhatsApp), usa; senão distribui proporcionalmente por share de cliques.

Tabelas lidas: Campaign, CampaignMetric, CampaignRedirectToken, Transaction, Store


getScheduleMetrics(startDate, endDate, filters?): Promise<object>

Dashboard de agenda de campanhas: KPIs de alcance, histórico diário, breakdown por campanha, loja e vendedor.

Flow:

  1. Busca transações influenciadas no período para descobrir campaignIds vinculados (campanhas de períodos anteriores que ainda geraram conversões).
  2. Busca campanhas do período OU cujos IDs aparecem nas transações.
  3. Para campanhas WhatsApp: agrupa WhatsappMessage por (campaignId, status) para obter { total, reached, read }.
  4. Calcula KPIs totais: disponibilizados, realizados, confirmados, descadastros, receitaInfluenciada, conversoes, clientesUnicos, frequencia.
  5. Constrói tabelas: daily (por dia), campaigns (por campanha), stores (por loja), sellers (por vendedor).

Tabelas lidas: Transaction, Campaign, WhatsappMessage, Store, Seller


getInfluencedCustomers(start, end, storeIds?): Promise<object[]>

Lista clientes que fizeram compra influenciada por campanha no período, agrupados por cliente.

Flow:

  1. Busca transações com isInfluenced: true, status não cancelado, no período e lojas fornecidas.
  2. Resolve nomes de campanhas em bulk.
  3. Agrupa em memória por customerId: acumula purchases, totalValue, firstPurchase, lastPurchase, e conjuntos de campanhas e lojas.
  4. Retorna ordenado por totalValue DESC.

Tabelas lidas: Transaction, Campaign, Customer, Store


Fluxo de sincronização completo (ordem de execução diária)

3h AM  — [CustomerSyncService] Cron: enfileira sync de clientes (processado em seguida)
         └─ milleniumService.streamCustomersKeyset(0, ...) — full sync
         └─ processBatch() em chunks de 50, concorrência de 8
         └─ Dedup por CPF hash (prioritário) e externalId (fallback)

5h AM  — [SyncService] Cron noturno: enfileira sync de vendas do dia anterior por loja
         └─ Stagger: loja 1 sem delay, loja 2 em 2min, loja 3 em 4min... (+ jitter 0-30s)
         └─ Para cada loja: processInvoice() para cada nota
            ├─ upsertCustomer() — cria/atualiza cliente com dados da nota
            ├─ transaction.upsert() — dedup por romaneio+sufixo
            ├─ resolveSeller() — upsert on-the-fly
            └─ scheduleFollowUps() fire-and-forget
               ├─ attributeTransaction() — atribuição a campanhas WhatsApp
               └─ postSaleSurvey.scheduleForTransaction() — pesquisa pós-venda

7h AM  — [RfmService] Cron: recomputa RFM de todos os clientes (após dados atualizados)
         └─ UPDATE "Customer" SET rfmStatus = ... FROM classified cl ...

Cron horário — [SyncService]: sync das últimas 1h por loja (mantém dados atualizados durante o dia)

3h AM  — [IntelligenceService] Cron: snapshot diário de todos os segmentos ativos
         └─ customer.count() por segmento → SegmentHistory + Segment.lastCount

Normalização de dados

Telefone

EtapaDescrição
ExtraçãoPercorre ddd_celular+cel, ddd+fone, endereços, número bare (≥10 dígitos)
DDDConcatenado quando explícito; não duplicado se já embutido no número
ValidaçãoNúmero < 10 dígitos → phoneIncomplete: true em dataQualityIssues
ClassificaçãoclassifyPhone() determina MOBILE ou LANDLINE

CPF / CNPJ

CampoArmazenamento
cpfHashSHA-256 do CPF normalizado (sem pontos/traços) — chave de dedup
cpfEncryptedCriptografia reversível (AES) — para consulta interna autorizada
cpfMasked***.***.*XX-** — para display
cpfValidBoolean resultado de validação dos dígitos verificadores
CPF em texto puroNunca armazenado

Data de nascimento

  • Formato OData /Date(ticks)/ ou string ISO.
  • Valores com ano < 1900 ou > anoAtual - 14 → invalidBirthDate: true em dataQualityIssues.

Variáveis de ambiente

VariávelPadrãoUso
DISABLE_SYNC_CRON'false'Desabilita todos os crons de sync (útil em ambiente de dev)
ERP_SYNC_LOCK_TTL_SECONDS7200TTL do lock distribuído Redis (reservado para sync com lock)
SYNC_START_DATEData inicial para backfill manual
SYNC_END_DATEData final para backfill manual
SYNC_FILIALCódigo de filial para sync focado
SYNC_CHUNK_MONTHS1Meses por batch (para backfills históricos)

Tabelas do banco de dados

TabelaOperações
CustomerUpsert por sync; leitura para listagem e perfil; opt-out
TransactionUpsert por sync; leitura para analytics
SellerUpsert on-the-fly durante sync de vendas; leitura para analytics
StoreLeitura para resolução de nota e filtros
OrganizationLeitura pelo cron de clientes
Campaign, CampaignMetricLeitura para dashboards de canal; escrita de conversions/revenue pela atribuição
CampaignRedirectTokenLeitura para contagem de cliques WhatsApp
WhatsappMessageLeitura para atribuição e métricas de agenda
SegmentLeitura para resolução de filtro de segmento

Documentação interna — Galdix CRM