Tema
Módulo ERP / Sincronização
O módulo ERP é a camada de integração entre o ERP Millennium e o banco de dados CRM. Ele é responsável por toda a sincronização de dados (vendas, clientes, vendedores, lojas e produtos), pela normalização e deduplicação de cadastros, pela atribuição de transações a campanhas de marketing e pelo fornecimento de analytics de vendas. A sincronização acontece via filas BullMQ com agendamentos cron independentes para cada tipo de dado.
Arquitetura de filas
O módulo usa duas filas BullMQ:
| Fila | Propósito |
|---|---|
erp-sync-queue | Jobs de sincronização de vendas por loja (disparados por cron horário/noturno e manual) |
erp-customer-sync-queue | Jobs de sincronização global de clientes (disparado por cron diário às 5h) |
Services
SyncService
Arquivo: backend/src/modules/erp/sync/sync.service.ts
Orquestra a sincronização de transações de venda do ERP Millennium para o CRM. Responsável por dois crons (noturno e horário), enfileiramento de jobs por loja, processamento de notas fiscais (upsert de clientes, resolução de vendedores, deduplicação de transações) e atribuição de transações a campanhas de WhatsApp.
Eventos permitidos do ERP
S - VENDA E-COMMERCE
S - VENDA VAREJO NFC-E (LOJAS)
S - VENDA VAREJO CFE (LOJAS)
S - VENDA VAREJO CFE ( ESTOQUE NEGAT
S - VENDA VAREJO NFE
E - DEVOLUCAO DE VENDA VAREJO
E - DEVOLUCAO DE VENDA E-COMMERCEMapa de redirecionamento de lojas
Algumas filiais no ERP usam códigos antigos que devem ser remapeados para os códigos corretos:
| Código ERP | Redireciona para |
|---|---|
006 | 007 |
012 | 028 |
019 | 029 |
Métodos
handleNightlySync(): Promise<void> (CRON)
Schedule: 0 6 * * * — executa às 6h AM todos os dias.
Sincroniza as vendas do dia anterior de todas as lojas ativas.
Flow:
- Verifica
DISABLE_SYNC_CRON === 'true'e retorna sem fazer nada se verdadeiro. - Calcula
yesterday = subDays(new Date(), 1). - Define
start = startOfDay(yesterday)eend = endOfDay(yesterday)como strings ISO. - Chama
queueStoreSyncs(start, end, 'nightly').
handleHourlySync(): Promise<void> (CRON)
Schedule: 0 * * * * — executa a cada hora cheia.
Sincroniza as vendas da última hora de todas as lojas ativas.
Flow:
- Verifica
DISABLE_SYNC_CRON === 'true'e retorna sem fazer nada se verdadeiro. - Define
start = subHours(now, 1)eend = now. - Chama
queueStoreSyncs(start, end, 'hourly').
queueStoreSyncs(start, end, frequency): Promise<void> (privado)
Enfileira um job de sincronização para cada loja ativa, com jitter para evitar picos de carga.
Flow:
- Busca todas as lojas ativas (
store.findMany where isActive: true). - Para cada loja
i: a. Calculadelay = i * 120_000 ms (2 minutos) + random(0..30_000 ms)— escalonamento com jitter. b. GerajobId = sync-sales-{store.code}-{frequency}-{timeKey}, ondetimeKeyéYYYY-MM-DDpara noturno ouYYYY-MM-DDTHHpara horário (garante idempotência — o mesmo job não é adicionado duas vezes no mesmo ciclo). c. Adiciona job'sync-sales'na filaerp-sync-queuecom:attemptsdoCRM_CONFIG,backoff: exponential 5 min,removeOnComplete: 25h,removeOnFail: 7 dias.
Tabelas lidas: Store
syncSales(start: Date, end: Date): Promise<object>
Processa todas as notas fiscais do ERP no período. Chamado pelo processador de fila (não diretamente pelo cron).
Flow:
- Chama
milleniumService.getFaturamentos(start, end)para buscar notas do ERP. - Para cada nota (
sale), chamaprocessInvoice(sale). Em caso de erro por nota, loga e continua. - Retorna
{ status, processed, new, updated }.
Tabelas lidas/escritas: via processInvoice
processInvoice(sale: any): Promise<boolean> (privado)
Processa uma nota fiscal individual do ERP. Retorna true se nova transação, false se atualização.
Flow:
- Determina o código da loja (
cod_emissor || cod_filial) e aplicaSTORE_REDIRECT_MAP. - Busca a loja no banco pelo código. Se não encontrada, pula a nota (skips non-configured stores).
- Resolve dados do cliente embutidos na nota (
sale.cliente[0]). Pula notas semcod_cliente. - Chama
resolveAddress(customerRaw)para selecionar o melhor endereço disponível. - Chama
extractPhone(customerRaw, allAddresses)para extrair telefone com DDD. - Chama
buildCustomerData(...)para construir o payload de upsert do cliente. - Chama
upsertCustomer(externalId, email, cpfCnpj, customerData)para criar/atualizar o cliente. - Chama
enrichSaleItems(sale)para normalizar produtos e calcularitemCount(Peças por Atendimento). - Chama
computeTransactionStatusAndValue(sale)para determinar status (PAID,CANCELLED,REFUNDED) e valor. - Chama
parseTransactionDate(sale)para converter o campo de data do ERP. - Chama
computeExternalTxId(sale)para gerar a chave de deduplicação. - Chama
resolveSeller(sale, organizationId)para obter o ID do vendedor (upsert on-the-fly). - Verifica se a transação já existe (
transaction.findUnique where externalId). - Executa
transaction.upsertcom todos os dados. O campochannelnão é sobrescrito em updates — preserva atribuição existente de WhatsApp. - Se transação nova, chama
scheduleFollowUps(...)de forma fire-and-forget. - Retorna
!existing.
Tabelas lidas: Store, Customer, TransactionTabelas escritas: Customer, Transaction, Seller
scheduleFollowUps(txId, status, customer, storeId, organizationId, storeDisplayName, vendorName): void (privado)
Dispara operações não-bloqueantes após criação de nova transação.
Operações (fire-and-forget, falhas apenas logadas):
attributeTransaction(txId)— tenta atribuir a transação a uma campanha de WhatsApp.- Se
status === 'PAID'ePostSaleSurveyServicedisponível:postSaleSurvey.scheduleForTransaction(...)— agenda pesquisa pós-venda.
attributeTransaction(transactionId): Promise<void> (privado)
Atribui uma transação a uma campanha de WhatsApp se houver mensagem recente dentro da janela de atribuição.
Flow:
- Busca a transação pelo ID (
transaction.findUnique, select:id, customerId, date, totalValue, campaignId). - Retorna se já tiver
campaignId, se não tivercustomerId, ou se não existir. - Busca o telefone do cliente (
customer.findUnique). Retorna se sem telefone ou com menos de 10 dígitos. - Busca a mensagem de WhatsApp enviada mais recente para o telefone (últimos 10 dígitos), direção
OUTBOUND, comcampaignId: { not: null }, status !=FAILED. - Busca a campanha para obter
attributionWindowDays. - Calcula
diffMs = tx.date - message.sentAt. Retorna se fora da janela ou se a mensagem foi enviada depois da transação. - Atualiza a transação:
{ campaignId, isInfluenced: true, channel: 'WHATSAPP' }. - Faz upsert em
CampaignMetric: incrementaconversions(só setotalValue > 0) erevenue.
Tabelas lidas: Transaction, Customer, WhatsappMessage, CampaignTabelas escritas: Transaction, CampaignMetric
upsertCustomer(externalId, email, cpfCnpj, customerData): Promise<Customer> (privado)
Localiza ou cria um cliente com estratégia de deduplicação em cascata.
Estratégia de resolução:
| Caso | Ação |
|---|---|
Encontrado por externalId | Atualiza o registro existente. Preserva o telefone se já tiver ≥ 10 dígitos (sync de vendas não é fonte autoritativa de contato). |
Não encontrado por externalId, mas CPF hash bate | Faz merge: atualiza o registro existente com o novo externalId (link) e mescla telefone (preserva existente se válido). |
| Não encontrado por nenhuma chave | Cria novo registro. |
resolveSeller(sale, organizationId): Promise<{ sellerId, vendorCode, vendorName }> (privado)
Resolve o vendedor a partir dos dados da nota. Faz upsert on-the-fly para vendedores novos.
- Retorna
{ sellerId: null, vendorCode: null, vendorName: null }quandocod_vendedoré ausente, vazio ou zero. - Faz
seller.upsert where externalId: vendorCode— cria comstatus: 'ACTIVE'se novo.
enrichSaleItems(sale): { enrichedItems, itemCount } (privado)
Normaliza itens de produto da nota ERP para o formato interno.
- Lê
sale.produtos || sale.produto(ambos os nomes de campo do ERP são suportados). - Mapeia cada item para
{ id, name, quantity, price, total }usando campos ERP:cod_produto,desc_produto || descricao,quantidade,preco ?? preco_venda,valor_total_liquido ?? total_item. - Calcula
itemCount = SUM(quantity)para PA (Peças por Atendimento).
computeTransactionStatusAndValue(sale): { status, totalValue } (privado)
Determina status e valor líquido da transação.
| Condição ERP | Status | Valor |
|---|---|---|
cancelado/cancelada === true/'T'/'True' | CANCELLED | valor original |
tipo_operacao === 'E' (entrada/devolução) | REFUNDED | -Math.abs(valor) |
| demais | PAID | valor_final ?? total |
computeExternalTxId(sale): string (privado)
Gera chave única de deduplicação para transações.
Formato: {romaneio || cod_pedidov || pedidov || trans_id}{sufixo}
Sufixo: -S para vendas, -E para devoluções. Evita colisão entre venda e sua devolução (que têm o mesmo romaneio no ERP, mas são registros distintos no CRM).
Motivo: trans_id é granular demais — Millennium gera um trans_id por forma de pagamento/parcela, gerando N registros idênticos por venda.
extractPhone(raw, addresses): string | null (privado)
Extrai o melhor telefone disponível do payload do ERP, percorrendo todas as fontes com ordem de prioridade.
Prioridade:
- Top-level com DDD explícito:
ddd_celular + cel - Top-level com DDD explícito:
ddd + fone - Endereços (percorre todos):
ddd_cel + cel - Endereços (percorre todos):
ddd + fone - Top-level sem campo DDD, mas número ≥ 10 dígitos (DDD embutido):
cel,fone - Endereços sem campo DDD, mas número ≥ 10 dígitos:
cel,fone
Regra de concatenação: Se o número já começa pelo DDD e tem ≥ 10 dígitos, não duplica o DDD.
parseTransactionDate(sale): Date (privado)
Converte campos de data do ERP para Date UTC.
- Formato OData v3
/Date(ticks±offset)/— usa apenas os ticks (milliseconds Unix), ignora o offset de fuso horário (o banco armazena em UTC). - String simples — strings do Millennium são em horário de Brasília (sem info de fuso). Adiciona 3h para converter BRT → UTC.
- Fallback:
new Date()(data atual).
parseBirthDate(birthDateStr): Date | null (privado)
Converte data de aniversário do ERP. Suporta formato OData e ISO plain. Retorna null para strings inválidas.
resolveAddress(customerRaw): { allAddresses, addressData } (privado)
Consolida os campos de endereço do payload ERP (enderecos, endereco, endereco_entrega) e seleciona o melhor: primeiro que tenha logradouro, cidade ou bairro; caso contrário o primeiro da lista; caso contrário {}.
buildCustomerData(...): any (privado)
Constrói o payload completo para upsert de cliente a partir dos dados brutos do ERP:
| Campo CRM | Origem ERP |
|---|---|
name | customerRaw.nome.trim() |
email | customerRaw.e_mail |
phone | extractPhone() |
phoneType | classifyPhone(phone) |
city, state | endereço resolvido |
address | logradouro + numero |
neighborhood, zipCode | campos de endereço |
cpfHash | hashCpf(cpfCnpj) — SHA-256 |
cpfEncrypted | encryption.encrypt(normalizeCpf(cpfCnpj)) — criptografia reversível |
cpfMasked | maskCpfDigits(cpfCnpj) — ex: ***.***.*XX-** |
cpfValid | isValidCPF(cpfCnpj) |
birthDate | parseBirthDate(data_aniversario) |
transId | BigInt(trans_id) |
CustomerSyncService
Arquivo: backend/src/modules/erp/sync/customer-sync.service.ts
Sincroniza o cadastro completo de clientes do ERP Millennium para o CRM. Opera via streaming paginado (keyset pagination) para lidar com bases grandes sem carregar tudo em memória. Usa transações em batch com fallback sequencial para isolamento de falhas.
Métodos
handleDailySync(): Promise<void> (CRON)
Schedule: 0 5 * * * — executa às 5h AM todos os dias (após seller sync às 3h, antes do sync de vendas às 6h).
Flow:
- Verifica
DISABLE_SYNC_CRON === 'true'. - Busca a primeira organização do banco.
- Gera
jobId = sync-customers-global-{YYYY-MM-DD}para idempotência diária. - Adiciona job
'sync-customers'na filaerp-customer-sync-queue.attemptsdoCRM_CONFIGbackoff: exponential 1 minremoveOnComplete: 24h, count: 100removeOnFail: 7 dias, count: 500
Tabelas lidas: Organization
syncCustomers(): Promise<void>
Executa a sincronização completa de todos os clientes. Chamado pelo processador de fila.
Flow:
- Busca a primeira loja ativa (ordem por
code ASC) para obterstoreIdeorganizationIdpadrão. - Chama
milleniumService.streamCustomersKeyset(0, callback)— streaming keyset pagination a partir detrans_id=0(full sync). - O callback processa cada batch recebido em sub-chunks de 50 registros:
- Chama
processBatch(chunk, defaultStoreId, defaultOrgId). - Incrementa
updatedCount. - Loga progresso a cada 5.000 clientes.
- Chama
- Ao final, loga
Customer sync complete. Total: {updatedCount}.
Tabelas lidas: Store
processBatch(batch, defaultStoreId, defaultOrgId): Promise<void> (privado)
Processa um batch de registros brutos do ERP, fazendo upsert de clientes com deduplicação.
Flow:
- Deduplicação de batch: Remove duplicatas por
cod_clientedentro do mesmo batch (o ERP pode enviar o mesmo cliente mais de uma vez). Loga se houve redução. - Extração de chaves:
externalIds— array deString(cod_cliente)do batch.cpfHashes— array dehashCpf(cpf || cnpj)do batch (filtra nulos).
- Bulk lookup paralelo:
customer.findMany where externalId: { in: externalIds }→ MapexternalId → customer.customer.findMany where cpfHash: { in: cpfHashes }→ MapcpfHash → customer.
- Processamento em chunks de 8 (abaixo do pool padrão do Prisma de 9 conexões):
- Para cada cliente bruto, chama
getQuery(raw)que monta a query de upsert. - Executa todas as queries do chunk como
prisma.$transaction([...queries]). - Se a transação falhar: fallback sequencial — executa cada query individualmente e loga erros por cliente.
- Para cada cliente bruto, chama
Lógica de resolução por cliente (dentro de getQuery):
| Caso | Ação |
|---|---|
| CPF hash encontrado | CASE 1: customer.update where id = cpfLookup.id — atualiza todos os campos, inclusive sincronizando externalId se mudou no ERP |
| Sem CPF, externalId encontrado | CASE 2: customer.update where id = externalIdLookup.id — atualiza todos os campos |
| Nenhuma chave encontrada | CASE 3: customer.create — novo cliente |
Normalização de dados por cliente:
email— convertido para lowercasephone—extractPhone(raw, allAddresses)com prioridade (DDD explícito > DDD embutido > bare number)cpfHash— SHA-256 do CPF/CNPJ normalizadocpfEncrypted— criptografia reversível para consulta internacpfMasked— display mascaradobirthDate— OData/Date(ticks)/ou string ISO;nulle valores inválidos logados com warningcreatedAt— mapeado dedata_cadastro(data real de cadastro no ERP); fallbacknew Date()transId—BigInt(trans_id)com tratamento de erro paratrans_idinválidospersonType—'PJ'sepf_pj === 'PJ', senão'PF'dataQualityIssues— objeto com flags:phoneIncomplete: true(telefone < 10 dígitos),invalidBirthDate: true(ano < 1900 ou > anoAtual - 14)
Tabelas lidas: CustomerTabelas escritas: Customer
extractPhone(raw, addresses): string | null (privado)
Mesma lógica de 5 níveis de prioridade descrita no SyncService. Adicionalmente:
- Nível 5 (último recurso): se o número tem menos de 10 dígitos, salva mesmo assim mas emite
logger.warnindicando ausência de DDD.
CustomersService
Arquivo: backend/src/modules/erp/customers/customers.service.ts
Gerencia a listagem, visualização e opt-out de clientes no CRM. Usa tenantContext (AsyncLocalStorage) para filtrar automaticamente pela organização do usuário autenticado. A listagem usa uma estratégia de paginação otimizada: quando o sort é por campo da tabela Customer, pagina primeiro na tabela de clientes e depois faz JOIN apenas na página de resultados (evita JOIN pesado em toda a base).
Conversão de score RFM
A função rfmLabelToScore(label) converte labels RFM (tanto legados quanto os 10 segmentos atuais) para score numérico de 0 a 100:
| Label | Score |
|---|---|
| Champions | 100 |
| Loyal Customers | 85 |
| Potential Loyalist | 75 |
| Promising | 65 |
| Recent Customers | 60 |
| Need Attention | 45 |
| About To Sleep | 35 |
| At Risk | 25 |
| Can't Lose Them | 20 |
| Hibernating | 5 |
Métodos
findAll(params): Promise<{ data, pagination }>
Lista clientes com suporte a busca textual, filtros por RFM, segmento, loja e status, e ordenação.
Parâmetros:
page,limit— paginação (padrão: página 1, 10 por página)search— busca por nome, e-mail, externalId ou CPF (hash automático se 11 ou 14 dígitos)sortBy—name | createdAt | rfmScore | lastPurchase | totalTransactions | ltvsortDir—asc | descrfmSegments— array de labels RFM para filtrarsegmentId— ID de um segmento salvo (resolvido dinamicamente)storeId— ID de loja específicastatusFilter— array delead | inactive | warning | active
Resolução de segmento: Quando segmentId é fornecido, carrega as regras do segmento e executa queryBuilder.buildAsync() para obter IDs de clientes correspondentes (máx. 100.000). Esses IDs são passados como filtro c.id = ANY(...).
Estratégia de query — Paginate-First (otimização):
Ativada quando: sortBy é um dos campos da tabela Customer (name, createdAt, rfmScore, lastPurchase) E não há statusFilter.
sql
-- Paginate-first:
WITH TotalCount AS (SELECT COUNT(*) FROM "Customer" c WHERE ...),
PaginatedIds AS (SELECT c.id FROM "Customer" c WHERE ... ORDER BY {campo} LIMIT ? OFFSET ?),
CustomerStats AS (
SELECT c.id, c.name, ..., SUM(t."totalValue") AS ltv, COUNT(t) AS "totalTransactions",
MAX(t.date) AS "lastPurchaseDate", EXTRACT(EPOCH ...) AS "daysSinceLastBuy"
FROM PaginatedIds pi JOIN "Customer" c ON c.id = pi.id LEFT JOIN "Transaction" t ON c.id = t."customerId"
GROUP BY c.id, ...
)
SELECT * FROM CustomerStats ORDER BY {campo} {dir} NULLS LASTEstratégia de query — Full (com status filter):
sql
WITH CustomerStats AS (
SELECT c.id, ..., SUM(t."totalValue") AS ltv, COUNT(t) AS "totalTransactions", MAX(t.date), ...
FROM "Customer" c LEFT JOIN "Transaction" t ON c.id = t."customerId"
WHERE c."organizationId" = $1 AND (c.name ILIKE $2 OR ...) {segmentClause} {storeClause}
GROUP BY c.id, c."rfmStatus"
),
FilteredCustomers AS (
SELECT *, COUNT(*) OVER() AS "fullCount"
FROM CustomerStats
WHERE ($4::boolean = false OR "rfmStatus" = ANY($5::text[]))
{statusClause}
)
SELECT * FROM FilteredCustomers ORDER BY ... LIMIT ? OFFSET ?Classificação de status (em memória):
| Status | Critério |
|---|---|
lead | daysSinceLastBuy IS NULL OR totalTransactions = 0 |
inactive | daysSinceLastBuy > 120 AND totalTransactions > 0 |
warning | daysSinceLastBuy > 60 AND <= 120 AND totalTransactions > 0 |
active | daysSinceLastBuy IS NOT NULL AND <= 60 AND totalTransactions > 0 |
Tabelas lidas: Customer, Transaction, Segment
findById(id: string): Promise<object>
Retorna perfil completo de um cliente com histórico de transações.
Flow:
- Busca cliente com
customer.findUnique where id, incluindo todas astransactions(comstoreesalesperson), ordenadas pordate desc. - Lança
NotFoundExceptionse não encontrado. - Busca em paralelo:
preferredStore(sepreferredStoreIdpresente) epreferredSeller(sepreferredSellerIdpresente, campo adicionado via migration raw). - Calcula métricas em memória:
ltv— soma detotalValuede todas as transações (inclui devoluções com valor negativo para LTV líquido correto).totalTransactions— conta apenas transações não devolvidas comtotalValue >= 0.lastPurchaseDate— data da transação mais recente (excluindo devoluções).daysSinceLastBuy—differenceInDays(today, lastPurchaseDate).age—floor(differenceInDays(today, birthDate) / 365)quando disponível.
- Monta campo
historycom tipo'purchase'ou'return'baseado emtotalValue < 0 || status === 'REFUNDED'.
Tabelas lidas: Customer, Transaction, Store, Seller
previewImportOptout(dto: ImportOptoutDto): Promise<object>
Verifica quais clientes de uma lista de CPFs seriam afetados pelo opt-out, sem aplicar.
Flow:
- Normaliza CPFs (remove pontos e traços) e gera hashes SHA-256.
- Busca clientes na organização com
cpfHash: { in: cpfHashes }. - Retorna
{ total, found, notFoundCount, notFound, customers }com nome e CPF mascarado de cada encontrado.
Tabelas lidas: Customer
applyImportOptout(dto: ImportOptoutDto): Promise<{ updated, total }>
Aplica opt-out em massa por canal para uma lista de CPFs.
Parâmetros do DTO:
cpfs— array de strings de CPF (com ou sem formatação)channel—'whatsapp' | 'email' | 'sms'
Flow:
- Normaliza CPFs e gera hashes.
- Executa
UPDATE "Customer" SET {channel}OptOut = true, {channel}OptOutAt = {now} WHERE organizationId = ? AND cpfHash = ANY(?) AND {channel}OptOut = false. - Retorna
{ updated: rows_affected, total: cpfs_count }.
Tabelas escritas: Customer
updatePreferences(id, dto): Promise<object>
Atualiza preferências de opt-out de um cliente individual.
Parâmetros do DTO (todos opcionais):
whatsappOptOut— booleanemailOptOut— booleansmsOptOut— boolean
Flow:
- Valida que o cliente existe na organização do contexto.
- Para cada campo presente no DTO: define o campo de opt-out e a respectiva data (
whatsappOptOutAt,emailOptOutAt,smsOptOutAt) —nullquando opt-out éfalse. - Atualiza e retorna os campos de opt-out atualizados.
Tabelas lidas: CustomerTabelas escritas: Customer
SalesService
Arquivo: backend/src/modules/erp/sales/sales.service.ts
Fornece todos os analytics de vendas: dashboard do dia em tempo real, comparação de lojas, histórico diário/mensal, métricas de varejo com comparativo de período anterior e YoY, dashboard de canais/campanhas, ranking de vendedores e lista de clientes influenciados. Opera em fuso de Brasília (UTC-3) com datas UTC armazenadas no banco.
Tratamento de lojas filiais
getFilialCustomerIds() — cacheia IDs de clientes cujo CPF hash corresponde ao CNPJ de uma loja filial (lojas com primaryStoreId: not null). Esses clientes representam transferências inter-filiais e são excluídos de métricas de CRM. O cache é de processo (válido enquanto o servidor está rodando, atualizado apenas no restart).
Convenção de datas
Todas as datas de transação são armazenadas no banco como UTC sem informação de fuso. Para cálculos diários e horários em BRT, o serviço usa:
brtDayBoundsUtc()— retorna{ start, end }do dia atual em BRT como UTC.nowBRT()— data/hora atual em BRT.- Queries SQL usam
- INTERVAL '3 hours'para agrupar por dia/hora BRT.
Métodos
processSale(data: CreateSaleDto): Promise<object>
Registra uma transação manualmente (usado por webhooks internos, não pelo sync do ERP).
Flow:
- Busca a loja pelo
data.storeIdpara obterorganizationId. - Tenta encontrar cliente por e-mail (
customer.findFirst where email). Se existir, atualiza o nome. - Se não existir, cria novo cliente com email, nome e CPF.
- Cria
Transactioncomstatus: 'PAID'.
Tabelas lidas: Store, CustomerTabelas escritas: Customer, Transaction
getDailyTotal(): Promise<object>
Retorna KPIs de vendas do dia atual (BRT) em tempo real.
Queries executadas em paralelo:
- Agregação do dia atual:
SUM(totalValue),COUNT(id)— transações não canceladas. - Agregação de ontem no mesmo intervalo de horas:
SUM(totalValue)— para comparativo. - Agregação do mês atual:
SUM(totalValue),COUNT(id). - Receita influenciada do mês (com exclusão de filiais).
- Taxa de recompra do mês: query SQL com subquery correlacionada que conta clientes únicos do mês e verifica se tinham compra anterior a
monthStart. - Contagem das últimas 4h (
PAIDapenas). - Última transação do dia (para indicar frescor do sync).
Retorno: { total, count, yesterdayTotal, monthTotal, monthCount, influencedRevenue, repurchaseRate, recentCount, lastSyncAt }
Tabelas lidas: Transaction
getSalesHistory(): Promise<object[]>
Retorna comparativo dia-a-dia entre mês atual e mês anterior.
Flow:
- Calcula limites de mês atual (dia 1 até hoje) e mês anterior (completo).
- Executa duas queries SQL em paralelo: uma por período, agrupando por
EXTRACT(DAY FROM date - INTERVAL '3 hours'). - Constrói array com uma entrada por dia do mês atual, com
mesAtualemesAnterior.
Tabelas lidas: Transaction
getRecentSales(): Promise<object[]>
Retorna as 15 transações PAID mais recentes do dia atual (BRT), com dados do cliente e loja.
Tabelas lidas: Transaction, Customer, Store
getStoreComparison(): Promise<object[]>
Retorna comparativo de vendas por loja: hoje vs ontem no mesmo intervalo de horas, e acumulado do ano.
Flow:
- Em paralelo: agrupa transações por
storeIdpara hoje, ontem-mesmo-horário, última venda e YTD. - Busca lojas primárias (excluindo aliases e lojas DESAT).
- Constrói mapa
storeId → primaryStoreId(incluindo aliases) para consolidar dados. - Para cada loja primária calcula variação percentual
(hoje - ontem) / ontem * 100. - Filtra lojas sem nenhuma venda no ano.
- Ordena por
today DESC.
Tabelas lidas: Transaction, Store
getIntradayHistory(): Promise<object[]>
Retorna distribuição horária de vendas: hoje vs ontem, hora a hora em BRT.
Flow:
- Executa duas queries SQL: uma para hoje e uma para ontem, agrupando por
EXTRACT(HOUR FROM date - INTERVAL '3 hours'). - Determina dinamicamente o range de horas com vendas em qualquer dos dois dias.
- Sempre inclui até a hora atual BRT no range.
- Retorna array com flag
isFuture: truepara horas ainda não ocorridas hoje.
Tabelas lidas: Transaction
triggerManualSync(): Promise<{ queued, start, end }>
Dispara sync manual de vendas do dia atual para todas as lojas ativas (excluindo lojas alias 006, 012, 019).
Tabelas lidas: Store
getTopSellers(): Promise<object[]>
Retorna os 10 vendedores com maior receita no mês atual.
Flow:
- Agrupa transações
PAIDdo mês atual porsalespersonId, somatotalValuee conta pedidos, ordena por soma DESC, limita a 10. - Busca dados dos vendedores incluindo loja.
- Retorna com
rankde 1 a 10.
Tabelas lidas: Transaction, Seller
getSellerRfm(params): Promise<object[]>
Aplica análise RFM aos vendedores: Recência, Frequência e Monetário calculados como percentis.
Parâmetros: start, end (padrão: últimos 12 meses), storeId (opcional).
Flow:
- Busca todos os vendedores ativos da organização (filtrados por loja se fornecido).
- Agrega transações por
salespersonIdno período:SUM(totalValue),COUNT(id),MAX(date). - Para vendedores com pelo menos 1 venda, calcula percentis de receita, frequência e recência usando arrays sorted + rank/length.
- Transforma percentil (0–1) em score 1–5.
- Classifica em labels:
| Label | Condição |
|---|---|
| Campeão | rScore ≥ 4, fScore ≥ 4, mScore ≥ 4 |
| Consistente | fScore ≥ 4, mScore ≥ 3 |
| Promissor | rScore ≥ 4, fScore ≥ 3 |
| Em Queda | mScore ≥ 4, rScore ≤ 2 |
| Em Desenvolvimento | rScore ≥ 3, fScore ≥ 2 |
| Inativo | rScore ≤ 2, mScore ≤ 2 |
| Regular | demais casos |
- Calcula
revenueSharede cada vendedor em relação ao total.
Tabelas lidas: Seller, Transaction
getRetailMetrics(start, end, storeIds?): Promise<object>
Dashboard completo de varejo: KPIs, histórico, canais e performance por loja.
Flow:
- Em paralelo: expande
storeIdsvia aliases e obtém IDs de clientes filiais. - Em paralelo: busca transações do período (com select de campos necessários), conta transações históricas antes do período para cada cliente ativo no período (subquery para determinar status novo/recorrente), e busca lista de lojas.
- Chama
aggregateTransactions()para processar em memória:- Agrupa por dia (períodos ≤ 60 dias) ou mês (períodos > 60 dias).
- Classifica cada cliente como
new(sem compra prévia),recurrent(orgânico com compra prévia) ourecovered(influenciado com compra prévia). - A classificação é determinada na primeira compra do cliente no período e usada em todas as compras subsequentes do mesmo cliente.
- Calcula
itemsPerTicket(PA) usandoitemCountdo ERP.
- Chama
fetchPrevYearData()para buscar dados do mesmo período no ano anterior (com shift de +1 ano nos labels para alinhar com o período atual). - Chama
computePrevPeriodKpis()para calcular métricas do período imediatamente anterior (mesmo comprimento do período atual). - Formata histórico com
formatHistoryEntries(), canais comformatChannels(), lojas comformatStores(). - Calcula variações percentuais KPI vs período anterior.
Retorno: { kpis: { revenue, transactions, ticketAverage, repurchaseRate, *Variation }, history, channels, stores, granularity }
Tabelas lidas: Transaction, Store
getChannelDashboard(startDate, endDate, filters?): Promise<object>
Dashboard de campanhas por canal com receita influenciada, conversões e métricas de envio.
Flow:
- Busca campanhas do período com métricas.
- Para campanhas WhatsApp: busca contagem de cliques em
CampaignRedirectToken(cliques em botões de redirecionamento). - Chama
normalizeCampaignMetrics()para padronizar campos por canal:- WhatsApp:
sent/delivered/opens/clicksvêm deCampaignMetric(não deCampaign). - E-mail: usa
CampaignMetriccom fallback para campos daCampaign.
- WhatsApp:
- Agrega transações por dia e por loja em duas queries SQL paralelas usando
SUM/COUNTcom flagisInfluenced. - Constrói
chartData(por dia),dispatchesList(por disparos de campanha),storesList(por loja). - Atribuição de receita por campanha: se a campanha tem métricas diretas (WhatsApp), usa; senão distribui proporcionalmente por share de cliques.
Tabelas lidas: Campaign, CampaignMetric, CampaignRedirectToken, Transaction, Store
getScheduleMetrics(startDate, endDate, filters?): Promise<object>
Dashboard de agenda de campanhas: KPIs de alcance, histórico diário, breakdown por campanha, loja e vendedor.
Flow:
- Busca transações influenciadas no período para descobrir
campaignIdsvinculados (campanhas de períodos anteriores que ainda geraram conversões). - Busca campanhas do período OU cujos IDs aparecem nas transações.
- Para campanhas WhatsApp: agrupa
WhatsappMessagepor(campaignId, status)para obter{ total, reached, read }. - Calcula KPIs totais:
disponibilizados,realizados,confirmados,descadastros,receitaInfluenciada,conversoes,clientesUnicos,frequencia. - Constrói tabelas:
daily(por dia),campaigns(por campanha),stores(por loja),sellers(por vendedor).
Tabelas lidas: Transaction, Campaign, WhatsappMessage, Store, Seller
getInfluencedCustomers(start, end, storeIds?): Promise<object[]>
Lista clientes que fizeram compra influenciada por campanha no período, agrupados por cliente.
Flow:
- Busca transações com
isInfluenced: true, status não cancelado, no período e lojas fornecidas. - Resolve nomes de campanhas em bulk.
- Agrupa em memória por
customerId: acumulapurchases,totalValue,firstPurchase,lastPurchase, e conjuntos de campanhas e lojas. - Retorna ordenado por
totalValue DESC.
Tabelas lidas: Transaction, Campaign, Customer, Store
Fluxo de sincronização completo (ordem de execução diária)
3h AM — [CustomerSyncService] Cron: enfileira sync de clientes (processado em seguida)
└─ milleniumService.streamCustomersKeyset(0, ...) — full sync
└─ processBatch() em chunks de 50, concorrência de 8
└─ Dedup por CPF hash (prioritário) e externalId (fallback)
5h AM — [SyncService] Cron noturno: enfileira sync de vendas do dia anterior por loja
└─ Stagger: loja 1 sem delay, loja 2 em 2min, loja 3 em 4min... (+ jitter 0-30s)
└─ Para cada loja: processInvoice() para cada nota
├─ upsertCustomer() — cria/atualiza cliente com dados da nota
├─ transaction.upsert() — dedup por romaneio+sufixo
├─ resolveSeller() — upsert on-the-fly
└─ scheduleFollowUps() fire-and-forget
├─ attributeTransaction() — atribuição a campanhas WhatsApp
└─ postSaleSurvey.scheduleForTransaction() — pesquisa pós-venda
7h AM — [RfmService] Cron: recomputa RFM de todos os clientes (após dados atualizados)
└─ UPDATE "Customer" SET rfmStatus = ... FROM classified cl ...
Cron horário — [SyncService]: sync das últimas 1h por loja (mantém dados atualizados durante o dia)
3h AM — [IntelligenceService] Cron: snapshot diário de todos os segmentos ativos
└─ customer.count() por segmento → SegmentHistory + Segment.lastCountNormalização de dados
Telefone
| Etapa | Descrição |
|---|---|
| Extração | Percorre ddd_celular+cel, ddd+fone, endereços, número bare (≥10 dígitos) |
| DDD | Concatenado quando explícito; não duplicado se já embutido no número |
| Validação | Número < 10 dígitos → phoneIncomplete: true em dataQualityIssues |
| Classificação | classifyPhone() determina MOBILE ou LANDLINE |
CPF / CNPJ
| Campo | Armazenamento |
|---|---|
cpfHash | SHA-256 do CPF normalizado (sem pontos/traços) — chave de dedup |
cpfEncrypted | Criptografia reversível (AES) — para consulta interna autorizada |
cpfMasked | ***.***.*XX-** — para display |
cpfValid | Boolean resultado de validação dos dígitos verificadores |
| CPF em texto puro | Nunca armazenado |
Data de nascimento
- Formato OData
/Date(ticks)/ou string ISO. - Valores com ano < 1900 ou > anoAtual - 14 →
invalidBirthDate: trueemdataQualityIssues.
Variáveis de ambiente
| Variável | Padrão | Uso |
|---|---|---|
DISABLE_SYNC_CRON | 'false' | Desabilita todos os crons de sync (útil em ambiente de dev) |
ERP_SYNC_LOCK_TTL_SECONDS | 7200 | TTL do lock distribuído Redis (reservado para sync com lock) |
SYNC_START_DATE | — | Data inicial para backfill manual |
SYNC_END_DATE | — | Data final para backfill manual |
SYNC_FILIAL | — | Código de filial para sync focado |
SYNC_CHUNK_MONTHS | 1 | Meses por batch (para backfills históricos) |
Tabelas do banco de dados
| Tabela | Operações |
|---|---|
Customer | Upsert por sync; leitura para listagem e perfil; opt-out |
Transaction | Upsert por sync; leitura para analytics |
Seller | Upsert on-the-fly durante sync de vendas; leitura para analytics |
Store | Leitura para resolução de nota e filtros |
Organization | Leitura pelo cron de clientes |
Campaign, CampaignMetric | Leitura para dashboards de canal; escrita de conversions/revenue pela atribuição |
CampaignRedirectToken | Leitura para contagem de cliques WhatsApp |
WhatsappMessage | Leitura para atribuição e métricas de agenda |
Segment | Leitura para resolução de filtro de segmento |