Engenharia de Dados
Fundações de dados para a era da IA: Modern Data Stack, lakehouses com Iceberg/Delta, arquiteturas event-driven, qualidade e governança. Engenharia de Dados deixou de ser bastidor — virou pré-requisito para qualquer LLM, agente ou pipeline RAG que pretenda funcionar em produção.
Fundamentos da Engenharia de Dados na era da IA
Até 2022, engenharia de dados era uma disciplina focada em alimentar dashboards de BI e modelos de ML clássico. De 2023 em diante, com a explosão de LLMs, agentes e RAG, o data engineer passou a ser responsável também por contextualizar IA — preparar embeddings, manter feature stores em tempo real, garantir frescor de dados em vector databases e operar pipelines híbridos (batch + streaming + vetor) que alimentam aplicações generativas.
O que mudou na prática: o pipeline não termina mais em uma tabela final no warehouse. Ele termina em um contexto entregue a um modelo. Isso muda contratos de SLA (segundos, não horas), muda tipos de dados (texto, vetores, JSON aninhado) e muda o critério de qualidade (não basta o número bater — a semântica precisa estar correta).
A Pirâmide de Maslow dos Dados (Monica Rogati)
Em 2017, Monica Rogati popularizou uma analogia que envelheceu bem: assim como Maslow ordena necessidades humanas, há uma hierarquia em projetos de dados. Pular degraus é a causa #1 de fracasso em iniciativas de IA.
Instrumentação, logs, eventos, sensores, integrações com fornecedores. Sem dado coletado, nada existe.
ETL/ELT, message buses, data lakes confiáveis. Dados precisam chegar em algum lugar acessível e durável.
Limpeza, anomaly detection, criação de features, modelagem semântica. dbt mora aqui.
Métricas, KPIs, training data rotulada para ML supervisionado, datasets de fine-tuning.
Topo da pirâmide: ML, deep learning, LLMs, agentes. Só funciona se as 4 camadas abaixo estiverem sólidas.
Data Engineering vs Data Science vs Analytics Engineering
Os três papéis frequentemente são confundidos, especialmente em organizações pequenas onde uma única pessoa faz tudo. Em equipes maduras, eles são distintos e complementares.
| Dimensão | Data Engineer | Analytics Engineer | Data Scientist |
|---|---|---|---|
| Foco principal | Infraestrutura, pipelines, confiabilidade | Modelagem semântica, transformações, métricas | Estatística, ML, descoberta de insights |
| Stack típico | Airflow, Kafka, Spark, Terraform, K8s | dbt, SQL, BigQuery/Snowflake, Looker | Python, scikit-learn, PyTorch, notebooks |
| Pergunta que responde | "Como garanto que o dado chega, certo, no prazo?" | "Como modelo isso para que analistas confiem?" | "O que esse dado está me dizendo?" |
| Output | Pipelines, schemas, SLAs, observabilidade | Tabelas dim/fact, métricas governadas, docs | Modelos, experimentos, recomendações |
| Em era de IA | Adiciona vector stores, embeddings pipeline, streaming RAG | Define contratos semânticos consumidos por LLMs (Text-to-SQL) | Trabalha com LLMs, RAG, agentes, fine-tuning |
Pesquisas da Gartner (2024) e MIT Sloan (2025) convergem: a causa dominante de projetos de IA que não chegam à produção é qualidade de dados, governança, frescor e linhagem — não escolha de modelo, não engenharia de prompts, não infraestrutura de inferência. O melhor LLM com lixo de input gera saídas inúteis. "Garbage in, garbage out" é literalmente a lei de gravidade da IA aplicada.
Modern Data Stack 2026
O termo Modern Data Stack (MDS) consolidou-se entre 2020 e 2023 e descreve uma arquitetura cloud-native, modular e baseada em ferramentas best-of-breed conectadas por SQL e APIs. Em 2026, ela continua sendo o padrão da indústria, agora estendida com camadas de IA (vector stores, feature stores em tempo real, copilots).
Os 5 Pilares
Mover dados de fontes (SaaS, DBs, APIs, eventos) para o storage central. Padrão atual: ELT (extract-load-transform), não mais ETL clássico. Ferramentas: Fivetran, Airbyte, Meltano, Stitch.
Armazenamento unificado, separação de storage e compute. Snowflake, BigQuery, Databricks e Redshift dominam. Em 2026, todos suportam tabelas Iceberg externas.
Transformar dados brutos em modelos de negócio dentro do warehouse. dbt virou padrão de fato. SQLMesh ganha tração para CDC complexo. Coalesce e dbt Cloud lideram experiência gerencial.
Coordenar dependências, schedules, retries. Airflow permanece dominante por inércia, mas Dagster e Prefect lideram em ergonomia. Argo Workflows em K8s para casos cloud-native.
Entregar dados a sistemas de consumo: BI (Looker, Tableau, Metabase), reverse ETL (Hightouch, Census) que devolve dados modelados a SaaS operacionais, e APIs/feature stores para ML/IA.
Vector DBs (Pinecone, Weaviate, pgvector, Turbopuffer), feature stores em tempo real (Tecton, Feast), embedding pipelines como cidadãos de primeira classe no orquestrador.
Fluxo End-to-End
A arquitetura típica de uma empresa de médio/grande porte em 2026, em formato textual:
┌─────────────────────────────────────────────────────────────────┐
│ FONTES DE DADOS │
│ Postgres │ Salesforce │ Stripe │ Eventos (Kafka) │ APIs SaaS │
└────────────────────────────┬────────────────────────────────────┘
│
┌──────────────▼──────────────┐
│ INGESTION (Fivetran/ │
│ Airbyte/Kafka Connect) │ ← ELT, CDC, streaming
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ RAW LAYER (Bronze) │ ← snapshot bruto
│ Snowflake / BigQuery / │ immutable, append-only
│ Databricks / S3 + Iceberg │
└──────────────┬──────────────┘
│
┌──────────────▼──────────────┐
│ TRANSFORMATION (dbt) │ ← Silver: limpo, tipado
│ - staging │ Gold: dim/fact, metrics
│ - intermediate │
│ - marts (gold) │
└──────────────┬──────────────┘
│
┌──────────────────────┼──────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌─────────────┐ ┌────────────────┐
│ BI │ │ Reverse ETL │ │ AI / ML Layer │
│ Looker │ │ Hightouch │ │ Vector DB │
│ Tableau │ │ → Salesforce│ │ Feature Store │
│ Metabase │ │ → HubSpot │ │ LLM (RAG) │
└──────────┘ └─────────────┘ └────────────────┘
▲ ▲
│ │
ORQUESTRAÇÃO (Airflow / Dagster / Prefect) ───────────┘
OBSERVABILIDADE (Monte Carlo / Soda / dbt tests)
GOVERNANÇA (Atlan / Collibra / DataHub)
Convenção popularizada por Databricks: Bronze guarda dados brutos como vieram (auditoria, replay); Silver tem dados limpos, tipados e deduplicados (joins quase prontos); Gold contém modelos dimensionais e métricas finais consumíveis por BI/ML/IA. Não pule camadas — Bronze permite reprocessar tudo se uma transformação tiver bug.
Lakehouse e Open Table Formats
O lakehouse é a maior mudança arquitetural em dados desde a invenção do data warehouse — e a razão pela qual 2024–2026 viraram um divisor de águas. Para entender por que, precisamos olhar a evolução.
Evolução: Warehouse → Lake → Lakehouse
Estruturado, ACID, rápido para SQL analítico. Caro, fechado, ruim para dados não-estruturados (texto, imagem, log). Schema-on-write rigoroso. Exemplos: Teradata, Oracle DW, SQL Server.
Storage barato (S3), qualquer formato (Parquet, JSON, imagem). Schema-on-read. Mas: sem ACID, sem updates fáceis, virou pântano (data swamp) sem governança. Exemplos: Hadoop, S3 + Hive.
Storage barato do lake + transações ACID, schema enforcement, time travel e performance do warehouse. Open table formats (Iceberg, Delta, Hudi) tornam isso possível diretamente sobre arquivos Parquet em S3.
Iceberg vs Delta Lake vs Hudi
Open table formats são a "camada de inteligência" sobre arquivos Parquet em object storage. Eles guardam metadados (manifestos, snapshots, schemas) que permitem operações antes exclusivas de DBs. Os três grandes são:
| Característica | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| Origem | Netflix (2018), Apache TLP | Databricks (2019), Linux Foundation | Uber (2017), Apache TLP |
| ACID | Sim (snapshot isolation) | Sim (serializable) | Sim (atomic commit) |
| Time travel | Sim, por snapshot ID ou timestamp | Sim, por versão ou timestamp | Sim, por commit timeline |
| Schema evolution | Add/drop/rename/reorder colunas, change tipos | Add/drop colunas, evolução restrita | Add/drop, evolução com restrições |
| Partition evolution | Sim (hidden partitioning) — único | Não nativo (precisa rewrite) | Limitado |
| Engines suportados | Spark, Flink, Trino, Snowflake, BigQuery, Athena, Dremio | Spark (1ª classe), Trino, Flink, recém Snowflake/BQ via UniForm | Spark, Flink, Hive, Trino |
| Sweet spot | Multi-engine, neutro de fornecedor, padrão emergente | Ecossistema Databricks, ML/AI tightly coupled | Ingestion incremental pesada (CDC, upserts) |
| Adoção em 2026 | 🚀 Vencendo: Snowflake e Databricks anunciaram Iceberg como padrão aberto | Forte em Databricks; UniForm dá interop com Iceberg | Nichos com upsert-heavy |
Por que isso é uma revolução em 2026
Em junho de 2024 a Snowflake anunciou suporte first-class ao Iceberg. Em julho do mesmo ano, a Databricks comprou a Tabular (criadores do Iceberg) por ~US$1B. O sinal é claro: o format wars acabou — Iceberg ganhou como o "Parquet dos metadados". A consequência prática é enorme: dados ficam em S3, em formato aberto, e qualquer engine pode lê-los (Snowflake hoje, Trino amanhã, um LLM com Text-to-SQL depois). Lock-in de fornecedor cai drasticamente. Para arquiteturas de IA, isso significa que os mesmos arquivos podem alimentar BI, treinamento de ML e RAG sem cópias.
Lakehouse não é bala de prata. Para cargas OLTP (transações operacionais com latência sub-100ms), continue com Postgres/MySQL. Para datasets pequenos (<100GB) sem necessidade de multi-engine, um warehouse puro (Snowflake nativo, BigQuery nativo) é mais simples. Lakehouse brilha quando você tem TB-PB, múltiplos consumidores (BI + ML + IA) e quer evitar lock-in.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
# ─── Spark configurado para Iceberg em S3 ─────────────────────────────────────
# Em produção, essas configs ficam em spark-defaults.conf ou no cluster manager
spark = (
SparkSession.builder
.appName("iceberg-demo")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.lakehouse.type", "glue") # ou "hive", "rest", "nessie"
.config("spark.sql.catalog.lakehouse.warehouse", "s3://meu-lakehouse/warehouse/")
.getOrCreate()
)
# ─── 1. Criando uma tabela Iceberg com partition evolution ────────────────────
spark.sql("""
CREATE TABLE IF NOT EXISTS lakehouse.vendas.pedidos (
pedido_id BIGINT,
cliente_id BIGINT,
valor DECIMAL(10,2),
status STRING,
criado_em TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(criado_em)) -- hidden partitioning: única do Iceberg
TBLPROPERTIES (
'format-version' = '2', -- v2 suporta row-level deletes (MERGE)
'write.target-file-size-bytes' = '134217728' -- 128MB sweet spot
)
""")
# ─── 2. Insert + MERGE (upsert) ───────────────────────────────────────────────
novos_pedidos = spark.read.parquet("s3://landing/pedidos-2026-04-22/")
# MERGE = padrão de upsert ACID que antes só existia em DBs
spark.sql("""
MERGE INTO lakehouse.vendas.pedidos AS alvo
USING (SELECT * FROM novos_pedidos_view) AS origem
ON alvo.pedido_id = origem.pedido_id
WHEN MATCHED AND origem.status != alvo.status THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# ─── 3. Time Travel: lendo o estado da tabela em qualquer ponto do passado ───
# Por timestamp (auditoria, debugging de dados, "ontem o número estava certo")
df_ontem = (
spark.read
.option("as-of-timestamp", "2026-04-21 23:59:59")
.table("lakehouse.vendas.pedidos")
)
print(f"Pedidos ontem: {df_ontem.count():,}")
# Por snapshot_id (reprodutibilidade exata de um experimento de ML)
snapshots = spark.sql("SELECT * FROM lakehouse.vendas.pedidos.snapshots").collect()
snapshot_treino = snapshots[0].snapshot_id
df_treino_v1 = (
spark.read
.option("snapshot-id", snapshot_treino)
.table("lakehouse.vendas.pedidos")
)
# ─── 4. Schema evolution sem rewrite ──────────────────────────────────────────
# Isso é o que faz Iceberg brilhar: você adiciona coluna em PB de dados
# em milissegundos (só metadado muda, arquivos Parquet ficam intactos).
spark.sql("""
ALTER TABLE lakehouse.vendas.pedidos
ADD COLUMN canal STRING COMMENT 'Origem do pedido (web/app/loja)'
""")
# Renomear coluna sem reescrever nada
spark.sql("ALTER TABLE lakehouse.vendas.pedidos RENAME COLUMN valor TO valor_brl")
# ─── 5. Manutenção: compaction de small files ─────────────────────────────────
# Streaming gera muitos arquivos pequenos. Compaction agrupa em arquivos do
# tamanho ideal para query performance.
spark.sql("""
CALL lakehouse.system.rewrite_data_files(
table => 'vendas.pedidos',
options => map('target-file-size-bytes', '134217728')
)
""")
# Expira snapshots antigos (>7 dias) para liberar storage
spark.sql("""
CALL lakehouse.system.expire_snapshots(
table => 'vendas.pedidos',
older_than => TIMESTAMP '2026-04-15 00:00:00'
)
""")
Streaming vs Batch — Event-Driven Architectures
A pergunta "batch ou streaming?" é uma das mais mal respondidas em engenharia de dados. A resposta sincera é: depende do SLA de frescor que o consumidor de dados precisa. Streaming é mais caro, mais complexo de operar e tem uma curva de aprendizado íngreme. Não use só porque é moderno.
Quando usar cada um
| Caso de uso | Frescor exigido | Recomendação | Por quê |
|---|---|---|---|
| Relatório financeiro D-1, contábil mensal | Horas a dias | Batch | Custo << streaming, complexidade trivial |
| Treinamento de ML / fine-tuning de LLM | Dias | Batch | Datasets grandes, reprodutibilidade importa mais que frescor |
| Dashboard executivo de operação | Minutos | Micro-batch (a cada 5–15 min) | Streaming verdadeiro é overkill; compromisso ótimo |
| Detecção de fraude em pagamento | < 1 segundo | Streaming (Flink/Kafka Streams) | Cada milissegundo de latência custa dinheiro |
| Personalização em tempo real (recsys) | < 100ms | Streaming + feature store online | UX exige resposta imediata |
| RAG sobre eventos recentes (ex: notícias) | Segundos | Streaming → embedding pipeline → vector DB | Dados envelhecem rápido demais para batch |
Apache Kafka, Flink e Spark Streaming
Backbone de eventos. Não processa — apenas armazena e distribui logs particionados. Praticamente padrão de fato. Confluent Cloud e AWS MSK eliminam dor operacional. Em 2026, KRaft (sem ZooKeeper) é o padrão.
Processador de stream verdadeiro: latência sub-segundo, exactly-once semantics, state management robusto. Rei para event-driven puro. Curva de aprendizado íngreme, mas insubstituível em fraude e CEP.
Micro-batch (lotes pequenos) com API unificada com Spark batch. Perfeito quando o time já conhece Spark e o SLA aceita segundos a poucos minutos. Continuous processing existe mas é menos maduro.
Para aplicações JVM-native onde processamento mora no próprio serviço. Sem cluster separado. Bom para microserviços que produzem e consomem eventos do mesmo Kafka.
Lambda vs Kappa Architecture
Duas abordagens clássicas para combinar (ou não) batch e streaming:
Lambda Architecture (Nathan Marz, 2011): mantém dois pipelines paralelos. Um batch layer recomputa tudo a cada N horas garantindo correção; um speed layer entrega resultados aproximados em tempo real; uma serving layer combina os dois. Vantagem: tolerante a bugs no streaming. Desvantagem: código duplicado — toda lógica precisa ser implementada duas vezes, em frameworks diferentes.
Kappa Architecture (Jay Kreps, 2014): só existe um pipeline — o streaming. Para reprocessar histórico, simplesmente "rewind" o tópico Kafka e o mesmo job consome desde o início. Vantagem: uma só base de código. Desvantagem: exige Kafka com retenção longa e jobs streaming idempotentes/determinísticos.
Em 2026, Kappa ganhou — graças a Iceberg + Flink (que permite reler tabelas históricas como streams) e à maturidade de Kafka com tiered storage. Lambda persiste em ambientes legados.
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import time
from datetime import datetime
# ─── PRODUCER: emite eventos de pedidos para o tópico "pedidos.criados" ──────
producer = KafkaProducer(
bootstrap_servers=["broker-1:9092", "broker-2:9092", "broker-3:9092"],
# Serialização: dict Python → JSON UTF-8
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
# Chave usada para particionamento: pedidos do mesmo cliente vão na mesma
# partição, garantindo ordenação por cliente.
key_serializer=lambda k: str(k).encode("utf-8"),
acks="all", # espera confirmação de todas as réplicas (durabilidade)
enable_idempotence=True, # evita duplicatas em caso de retry
retries=10,
linger_ms=5, # buffer 5ms para batching mais eficiente
compression_type="snappy" # compressão reduz custo de rede ~70%
)
def publicar_pedido(pedido: dict):
"""Publica evento de pedido no tópico, particionado por cliente_id."""
try:
future = producer.send(
topic="pedidos.criados",
key=pedido["cliente_id"],
value={**pedido, "publicado_em": datetime.utcnow().isoformat()}
)
# .get() bloqueia até confirmação — em prod, prefira callback assíncrono
metadata = future.get(timeout=10)
print(f"✓ partição={metadata.partition} offset={metadata.offset}")
except KafkaError as e:
print(f"✗ Falha ao publicar: {e}")
# Exemplo de uso
publicar_pedido({"pedido_id": 12345, "cliente_id": 789, "valor_brl": 199.90})
producer.flush() # garante que tudo foi enviado antes de encerrar
# ─── CONSUMER: detecta fraude em tempo real ──────────────────────────────────
consumer = KafkaConsumer(
"pedidos.criados",
bootstrap_servers=["broker-1:9092"],
group_id="deteccao-fraude-v1", # consumer group: paralelismo + offset tracking
auto_offset_reset="earliest", # do início se não houver offset salvo
enable_auto_commit=False, # commit manual = at-least-once explícito
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100, # batch size por poll
session_timeout_ms=30000
)
def avaliar_fraude(pedido: dict) -> bool:
"""Heurística simplificada — em produção seria modelo ML em feature store."""
return pedido["valor_brl"] > 10_000 or pedido.get("ip_pais") == "XX"
print("🎧 Aguardando eventos...")
for mensagem in consumer:
pedido = mensagem.value
if avaliar_fraude(pedido):
# Emite evento de alerta em outro tópico — composição de streams
producer.send("alertas.fraude", key=pedido["cliente_id"], value=pedido)
print(f"🚨 Fraude suspeita: pedido {pedido['pedido_id']}")
# Commit manual só após processamento bem-sucedido (at-least-once correto)
consumer.commit()
Operar um cluster Kafka + Flink saudável em produção exige expertise de SRE de dados (monitoramento de lag, rebalances, state size, exactly-once garantido). Custo total (infra + pessoas) costuma ser 5–10× maior que pipeline batch equivalente. Justifique a partir do SLA de negócio, não do hype.
Data Quality, Observability e DataGovOps
Pipeline rodando não é pipeline funcionando. Em 2026, data quality e observability deixaram de ser opcionais — viraram pré-requisito para qualquer sistema que alimente decisões automatizadas (BI, ML, LLMs). Um valor errado em uma tabela Gold pode contaminar relatórios, modelos e respostas de chatbots por semanas até alguém perceber.
As ferramentas que importam
Open-source. Define "expectations" declarativas em Python/YAML (ex: "coluna X nunca é nula", "valor entre 0 e 100"). Gera Data Docs HTML automaticamente. Padrão para validação em pipelines.
Concorrente direto. Linguagem SodaCL declarativa, mais leve que Great Expectations. Soda Cloud (SaaS) integra com Slack/Jira para alertas. Boa escolha para times que querem menos boilerplate.
Data observability: aprendem perfis estatísticos das tabelas e disparam alertas de anomalia automaticamente (volume sumiu, distribuição mudou, frescor parou). Não substituem testes — complementam.
Rastrear origem de cada coluna do warehouse até a fonte. dbt fornece automaticamente. OpenLineage padroniza eventos entre ferramentas. DataHub (LinkedIn) e Atlan oferecem catálogos com IA.
Data Contracts (a tendência que sobe em 2026)
Um data contract é o equivalente de OpenAPI para dados: um schema versionado, formal, acordado entre quem produz o dado (time de aplicação que emite eventos) e quem consome (time de dados). Ele declara: nomes de campos, tipos, semântica, SLAs de frescor, regras de qualidade, políticas de PII e processo de breaking change.
Por que isso virou crítico: à medida que LLMs consomem dados via Text-to-SQL ou agentes, qualquer mudança silenciosa em schemas quebra fluxos sem barulho — o LLM simplesmente alucina. Contratos previnem isso movendo a responsabilidade de "garantir que o dado existe e está correto" para upstream, antes do warehouse.
DataGovOps: governança como código
DataGovOps é a aplicação de princípios DevOps a governança: políticas em Git, revisão por PR, deploy automatizado, testes contínuos. Em vez de um documento Word descrevendo "campos de PII devem ser mascarados", você tem regras YAML versionadas que ferramentas (dbt, Snowflake masking policies, Open Policy Agent) aplicam automaticamente. Rotulagem de dados sensíveis vira parte do CI; mudança de política passa por code review.
1. Acurácia: o dado reflete a realidade?
2. Completude: faltam registros ou colunas?
3. Consistência: mesma entidade tem o mesmo valor em sistemas diferentes?
4. Frescor: o dado está atualizado dentro do SLA?
Toda dimensão precisa ser monitorada com alertas — não é suficiente verificar uma vez por semana manualmente.
import great_expectations as gx
import pandas as pd
from datetime import datetime
# ─── Setup do contexto GE (em prod, persistido em S3/Git) ────────────────────
context = gx.get_context(mode="ephemeral") # ephemeral = só na memória, p/ demo
# ─── Carregando dados que queremos validar ───────────────────────────────────
df = pd.read_parquet("s3://lakehouse/silver/clientes/dt=2026-04-22/")
# Conecta o DataFrame ao contexto GE
data_source = context.data_sources.add_pandas("clientes_silver")
data_asset = data_source.add_dataframe_asset(name="clientes_diario")
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch_full")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})
# ─── Definindo as expectations (o "contrato" da tabela) ──────────────────────
suite = context.suites.add(gx.ExpectationSuite(name="clientes_silver_v1"))
# 1. Schema esperado: colunas obrigatórias
suite.add_expectation(gx.expectations.ExpectTableColumnsToMatchSet(
column_set=["cliente_id", "email", "cpf", "criado_em", "status", "score_credito"],
exact_match=False # permite colunas extras, exige as listadas
))
# 2. Chave primária: cliente_id deve ser único e nunca nulo
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="cliente_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="cliente_id"))
# 3. Email: formato válido (regex). Campos de PII como email JAMAIS podem ter
# valores quebrados que indiquem corrupção upstream.
suite.add_expectation(gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
mostly=0.99 # tolera 1% de inválidos (legado), alerta se passar disso
))
# 4. CPF: comprimento e formato (não validamos dígito aqui, mas poderíamos)
suite.add_expectation(gx.expectations.ExpectColumnValueLengthsToEqual(
column="cpf", value=11
))
# 5. Score de crédito: regra de negócio — sempre entre 0 e 1000
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
column="score_credito", min_value=0, max_value=1000
))
# 6. Frescor: criado_em deve estar dentro das últimas 24h (SLA do contrato)
suite.add_expectation(gx.expectations.ExpectColumnMaxToBeBetween(
column="criado_em",
min_value=(datetime.utcnow() - pd.Timedelta(hours=24)).isoformat(),
max_value=datetime.utcnow().isoformat()
))
# 7. Status: enum fechado — qualquer valor fora disso quebra contrato
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["ATIVO", "INATIVO", "BLOQUEADO", "PENDENTE"]
))
# ─── Executando validação ────────────────────────────────────────────────────
validation_definition = context.validation_definitions.add(
gx.ValidationDefinition(
name="clientes_diario_validacao",
data=batch_definition,
suite=suite
)
)
result = validation_definition.run(batch_parameters={"dataframe": df})
# ─── Reagindo ao resultado ───────────────────────────────────────────────────
if not result.success:
falhas = [r for r in result.results if not r.success]
print(f"❌ {len(falhas)} expectations falharam — pipeline ABORTADO")
for f in falhas:
print(f" - {f.expectation_config.type}: {f.result.get('unexpected_count', 'N/A')} valores ruins")
# Em produção: chamar Airflow/Dagster para sair com erro, alertar Slack,
# bloquear promoção de Silver → Gold.
raise RuntimeError("Quality gate failed")
else:
print(f"✅ {len(result.results)} expectations passaram — promovendo para Gold")
AI nos pipelines de dados
A meta-tendência mais forte de 2025–2026 é a inversão: IA não é mais só consumidora de dados — virou parte ativa do pipeline. LLMs geram SQL, populam catálogos, sugerem transformações, escrevem testes, detectam anomalias e até propõem modelagens. O data engineer moderno não compete com IA; orquestra IA em cada etapa.
Text-to-SQL: a interface mais valiosa do warehouse
Permitir que usuários de negócio façam perguntas em português ("quanto vendemos para clientes do Sul no Q1?") e obter SQL correto + resposta é o santo graal do BI self-service. As últimas duas gerações de modelos (GPT-4 Turbo, Claude 3.5 Sonnet, Gemini 2.0) atingiram acurácia >85% em benchmarks como Spider e BIRD — bom o bastante para produção quando combinado com schema contextualizado e validações.
Copilots dentro dos warehouses
LLMs, embeddings e vector search nativos no warehouse. Funções CORTEX.COMPLETE(), CORTEX.EMBED_TEXT() e VECTOR_COSINE_SIMILARITY em SQL. Cortex Analyst faz Text-to-SQL com semantic models.
Conversational data analyst. Engenheiros de dados curam "rooms" com tabelas certificadas; usuários conversam, Genie gera SQL, executa, gera viz. AI/BI Dashboards integram tudo.
Gera modelos dbt a partir de descrição em linguagem natural, sugere testes, completa documentação YAML automaticamente, explica linhagem complexa. Integrado ao dbt Cloud IDE.
Atlan, Alation e DataHub usam LLMs para gerar descrições de tabelas/colunas, classificar PII automaticamente, sugerir tags, ranquear datasets por relevância à pergunta do usuário.
Riscos práticos de IA em pipelines
1. Read-only obrigatório: o usuário SQL técnico do LLM nunca deve ter permissão de UPDATE/DELETE/DROP — limit blast radius.
2. Allowlist de tabelas: não exponha tudo. Apenas tabelas Gold curadas.
3. Validação semântica: passe o SQL gerado por um guardrail (parser, EXPLAIN dry-run, limit forçado).
4. Auditoria: logue toda pergunta + SQL + resultado. Use isso para fine-tuning futuro.
5. Cite a fonte: a resposta deve mostrar quais tabelas foram usadas — sem isso, vira oráculo cego.
import openai
import os
import sqlglot # parser SQL multi-dialeto, p/ validação
from sqlglot import expressions as exp
client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
# ─── 1. Schema curado: o "contexto" que o LLM recebe ─────────────────────────
# Em produção, isso vem do catálogo (DataHub/Atlan) ou de uma semantic layer.
SCHEMA_DOC = """
-- Tabela: analytics.gold.fct_pedidos
-- Granularidade: 1 linha por pedido. Atualizada a cada 15 min.
CREATE TABLE analytics.gold.fct_pedidos (
pedido_id BIGINT PRIMARY KEY,
cliente_id BIGINT REFERENCES dim_clientes,
produto_id BIGINT REFERENCES dim_produtos,
valor_brl DECIMAL(10,2), -- Valor líquido em BRL
canal STRING, -- 'web' | 'app' | 'loja'
criado_em TIMESTAMP, -- UTC
status STRING -- 'PAGO' | 'CANCELADO' | 'PENDENTE'
);
-- Tabela: analytics.gold.dim_clientes
-- Granularidade: 1 linha por cliente. SCD tipo 2.
CREATE TABLE analytics.gold.dim_clientes (
cliente_id BIGINT,
nome STRING,
estado_uf STRING, -- 'SP', 'RJ', 'RS', etc.
regiao STRING, -- 'Sul' | 'Sudeste' | 'Norte' | ...
segmento STRING -- 'B2B' | 'B2C'
);
"""
SYSTEM_PROMPT = f"""Você é um analista SQL sênior em Snowflake.
Gere APENAS uma query SQL válida em Snowflake SQL, nada mais.
Regras OBRIGATÓRIAS:
- Use apenas as tabelas e colunas listadas no schema abaixo.
- SEMPRE inclua LIMIT 1000 a menos que o usuário peça agregação.
- Use joins explícitos (INNER/LEFT JOIN), nunca cross join implícito.
- NUNCA gere DDL, DML (UPDATE/DELETE/INSERT), nem queries em outras tabelas.
- Para datas relativas, use CURRENT_DATE() / DATEADD().
Schema disponível:
{SCHEMA_DOC}
"""
def texto_para_sql(pergunta: str) -> str:
"""Converte pergunta em PT para SQL Snowflake."""
resp = client.chat.completions.create(
model="gpt-4o-mini", # bom o bastante p/ Text-to-SQL com schema
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": pergunta}
],
temperature=0.0, # determinístico — SQL não é criativo
)
sql = resp.choices[0].message.content.strip()
# Remove ```sql blocos se o modelo embrulhar
sql = sql.removeprefix("```sql").removeprefix("```").removesuffix("```").strip()
return sql
def validar_sql_seguro(sql: str) -> tuple[bool, str]:
"""Guardrail: parseia o SQL e rejeita operações perigosas."""
try:
parsed = sqlglot.parse_one(sql, read="snowflake")
except Exception as e:
return False, f"SQL inválido: {e}"
# Bloqueia qualquer DDL/DML — LLM só pode SELECT
proibidos = (exp.Insert, exp.Update, exp.Delete, exp.Drop, exp.AlterTable, exp.Create)
if any(parsed.find(p) for p in proibidos):
return False, "Operação proibida detectada (DDL/DML)"
# Tabelas referenciadas devem estar na allowlist
ALLOWLIST = {"analytics.gold.fct_pedidos", "analytics.gold.dim_clientes"}
tabelas = {t.name.lower() for t in parsed.find_all(exp.Table)}
if not tabelas.issubset({n.split(".")[-1] for n in ALLOWLIST}):
return False, f"Tabela não permitida: {tabelas - ALLOWLIST}"
return True, "OK"
# ─── Uso ──────────────────────────────────────────────────────────────────────
pergunta = "Quanto vendemos para clientes do Sul no último trimestre, por canal?"
sql_gerado = texto_para_sql(pergunta)
print(f"📝 SQL gerado:\n{sql_gerado}\n")
ok, msg = validar_sql_seguro(sql_gerado)
if ok:
print("✅ Guardrail aprovou — pode executar no warehouse")
# cursor.execute(sql_gerado) ...
else:
print(f"🚫 Bloqueado: {msg}")
O papel não está sendo substituído — está mudando de escritor de pipelines para curador de contratos, semantic models e guardrails que IA usa. Quem aprender a desenhar bons semantic layers (dbt + descrições ricas + métricas governadas), bons guardrails para Text-to-SQL e bons pipelines de embedding/RAG vai liderar a próxima década. Quem só souber escrever Airflow DAGs vai ser substituído por dbt Copilot.
Recapitulação: o que levar dessa disciplina
Sem coleta confiável e armazenamento sólido, IA no topo é teatro. Invista de baixo para cima.
Em 2026, Apache Iceberg é o "Parquet dos metadados". Apostar nele reduz lock-in dramaticamente.
Custo é 5–10× batch. Justifique pelo SLA do consumidor, não pela modernidade aparente.
Data contracts versionados em Git previnem o caos silencioso que LLMs amplificam.
Read-only, allowlist, parsing de validação, auditoria. Não negocie nenhum desses.
A IA escreve SQL e DAGs. Você desenha semantic models, contratos e governança.