1

Fundamentos DevOps e SRE — Onde a IA Entra

DevOps é uma cultura e conjunto de práticas que une desenvolvimento e operações para entregar software com mais velocidade e confiabilidade. O SRE (Site Reliability Engineering), criado pelo Google, aplica princípios de engenharia de software aos problemas de operações — usando automação, métricas e budgets de erro para equilibrar velocidade de entrega com estabilidade do sistema.

Os Pilares DevOps que a IA Potencializa

🔄
Integração Contínua

IA sugere configurações de pipeline, analisa falhas de build com linguagem natural e prioriza flaky tests com base em histórico de execuções.

🚀
Entrega Contínua

LLMs geram changelogs automáticos a partir de commits, avaliam risco de deploy com base em métricas históricas e recomendam rollback estratégico.

📉
Redução de MTTR

IA reduz Mean Time to Recovery ao correlacionar alertas, sugerir runbooks relevantes e gerar resumos de incidente para o on-call em segundos.

🔍
Observabilidade

Consultas em linguagem natural para Prometheus/Grafana, detecção de anomalias sem threshold manual, insights automáticos de logs.

🛡️
Segurança Shift-Left

LLMs analisam código em busca de vulnerabilidades durante o PR review, antes do merge — não após o deploy em produção.

📚
Gestão do Conhecimento

Postmortems gerados automaticamente, runbooks mantidos atualizados por IA e busca semântica em documentação de incidentes históricos.

O Papel do SRE no Mundo com IA

O SRE moderno precisa dominar não apenas as práticas clássicas (SLOs, error budgets, toil reduction), mas também entender como integrar IA ao seu toolkit operacional. A IA não substitui o SRE — ela elimina o toil cognitivo: a análise repetitiva de logs, a redação de postmortems, a tradução de alertas em ações. O SRE passa a ser um engenheiro de sistemas de IA operacional, responsável por garantir que os sistemas autônomos tomem decisões corretas e seguras.

ℹ️
Onde GenAI atua no ciclo DevOps hoje

Maturidade alta (produção): Geração de pipelines CI/CD, changelogs, documentação de runbooks, análise de logs em linguagem natural. Maturidade média (adoção crescente): SAST com LLM, sugestão de mitigação de incidentes, dashboards auto-gerados. Maturidade emergente (experimental): Agentes autônomos que executam rollback, escalam serviços e abrem PRs de correção automaticamente.

40%
Redução em MTTR com IA
3x
Velocidade de triagem
65%
Toil reduzido por agentes
85%
Precisão em detecção de anomalias
2

Automação de Pipelines CI/CD com IA

A geração e otimização de pipelines CI/CD com IA vai além de "escrever YAML por você". O modelo analisa o repositório, detecta a stack tecnológica, as dependências e o histórico de builds para gerar pipelines otimizados — e continua aprendendo com cada execução.

GitHub Actions Gerado por IA: Pipeline Completo

O exemplo abaixo é um pipeline GitHub Actions robusto para uma aplicação Python com FastAPI, cobrindo linting, testes, análise de segurança, build de container e deploy. Pipelines como este podem ser gerados automaticamente por IA ao analisar o pyproject.toml e estrutura do projeto.

YAML
.github/workflows/ci-cd.yml
# Pipeline CI/CD para aplicação FastAPI
# Gerado e otimizado com assistência de IA (Claude Sonnet)
# Inclui: lint, test, segurança, build, deploy com aprovação manual

name: CI/CD Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  workflow_dispatch:
    inputs:
      environment:
        description: 'Ambiente de deploy'
        required: true
        default: 'staging'
        type: choice
        options: [staging, production]

env:
  PYTHON_VERSION: "3.12"
  REGISTRY: ghcr.io
  IMAGE_NAME: ${{ github.repository }}

# Cancela runs anteriores para PRs (economiza minutos de CI)
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}

jobs:
  # ─────────────────────────────────────────────────────────────────
  # JOB 1: Análise de código e lint
  # ─────────────────────────────────────────────────────────────────
  lint-and-type-check:
    name: "🔍 Lint & Type Check"
    runs-on: ubuntu-latest
    timeout-minutes: 10

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}
          cache: pip

      - name: Install dev dependencies
        run: pip install ruff mypy

      - name: Lint with ruff
        run: |
          ruff check . --output-format=github
          ruff format --check .

      - name: Type check with mypy
        run: mypy src/ --ignore-missing-imports --strict

  # ─────────────────────────────────────────────────────────────────
  # JOB 2: Testes unitários e de integração
  # ─────────────────────────────────────────────────────────────────
  test:
    name: "🧪 Tests (Python ${{ matrix.python-version }})"
    runs-on: ubuntu-latest
    timeout-minutes: 20
    needs: lint-and-type-check

    strategy:
      matrix:
        python-version: ["3.11", "3.12"]

    services:
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_DB: testdb
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
        ports:
          - 5432:5432

      redis:
        image: redis:7-alpine
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
        ports:
          - 6379:6379

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
          cache: pip

      - name: Install dependencies
        run: pip install -e ".[test]"

      - name: Run tests with coverage
        env:
          DATABASE_URL: postgresql://test:test@localhost:5432/testdb
          REDIS_URL: redis://localhost:6379
          ENVIRONMENT: test
        run: |
          pytest tests/ \
            --cov=src \
            --cov-report=xml \
            --cov-report=term-missing \
            --cov-fail-under=80 \
            -v \
            --tb=short \
            --junitxml=test-results.xml

      - name: Upload coverage to Codecov
        uses: codecov/codecov-action@v4
        with:
          files: ./coverage.xml
          flags: python-${{ matrix.python-version }}

      - name: Upload test results
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: test-results-${{ matrix.python-version }}
          path: test-results.xml

  # ─────────────────────────────────────────────────────────────────
  # JOB 3: Análise de segurança (SAST + dependências)
  # ─────────────────────────────────────────────────────────────────
  security-scan:
    name: "🛡️ Security Scan"
    runs-on: ubuntu-latest
    timeout-minutes: 15
    needs: lint-and-type-check
    permissions:
      security-events: write  # para upload SARIF

    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ env.PYTHON_VERSION }}

      - name: Install security tools
        run: pip install bandit safety semgrep

      - name: SAST with Bandit
        run: |
          bandit -r src/ \
            -f sarif \
            -o bandit-results.sarif \
            --severity-level medium \
            --confidence-level medium || true

      - name: Upload Bandit SARIF
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: bandit-results.sarif
          category: bandit

      - name: Check vulnerable dependencies
        run: safety check --full-report --policy-file .safety-policy.yml

      - name: Semgrep SAST
        env:
          SEMGREP_APP_TOKEN: ${{ secrets.SEMGREP_APP_TOKEN }}
        run: semgrep ci --config=auto --sarif > semgrep-results.sarif 2>/dev/null || true

      - name: Upload Semgrep SARIF
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: semgrep-results.sarif
          category: semgrep

  # ─────────────────────────────────────────────────────────────────
  # JOB 4: Build e push da imagem Docker
  # ─────────────────────────────────────────────────────────────────
  build-and-push:
    name: "🐳 Build & Push Image"
    runs-on: ubuntu-latest
    timeout-minutes: 20
    needs: [test, security-scan]
    if: github.event_name != 'pull_request'
    permissions:
      contents: read
      packages: write

    outputs:
      image-digest: ${{ steps.build.outputs.digest }}
      image-tag: ${{ steps.meta.outputs.tags }}

    steps:
      - uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Log in to GHCR
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
          tags: |
            type=ref,event=branch
            type=semver,pattern={{version}}
            type=sha,prefix=,suffix=,format=short
            type=raw,value=latest,enable={{is_default_branch}}

      - name: Build and push
        id: build
        uses: docker/build-push-action@v5
        with:
          context: .
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          build-args: |
            BUILD_DATE=${{ github.event.head_commit.timestamp }}
            VCS_REF=${{ github.sha }}

  # ─────────────────────────────────────────────────────────────────
  # JOB 5: Deploy para staging
  # ─────────────────────────────────────────────────────────────────
  deploy-staging:
    name: "🚀 Deploy → Staging"
    runs-on: ubuntu-latest
    timeout-minutes: 15
    needs: build-and-push
    if: github.ref == 'refs/heads/develop'
    environment: staging

    steps:
      - uses: actions/checkout@v4

      - name: Deploy to Kubernetes (staging)
        env:
          KUBECONFIG_DATA: ${{ secrets.KUBECONFIG_STAGING }}
          IMAGE_DIGEST: ${{ needs.build-and-push.outputs.image-digest }}
        run: |
          echo "$KUBECONFIG_DATA" | base64 -d > kubeconfig.yml
          export KUBECONFIG=kubeconfig.yml
          kubectl set image deployment/api \
            api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${IMAGE_DIGEST} \
            --namespace=staging
          kubectl rollout status deployment/api --namespace=staging --timeout=300s

      - name: Run smoke tests
        run: |
          sleep 15
          curl -f https://api-staging.empresa.com/health || exit 1

  # ─────────────────────────────────────────────────────────────────
  # JOB 6: Deploy para produção (com aprovação)
  # ─────────────────────────────────────────────────────────────────
  deploy-production:
    name: "🎯 Deploy → Production"
    runs-on: ubuntu-latest
    timeout-minutes: 20
    needs: build-and-push
    if: github.ref == 'refs/heads/main'
    environment: production   # Requer aprovação manual no GitHub

    steps:
      - uses: actions/checkout@v4

      - name: Deploy to Kubernetes (production)
        env:
          KUBECONFIG_DATA: ${{ secrets.KUBECONFIG_PRODUCTION }}
          IMAGE_DIGEST: ${{ needs.build-and-push.outputs.image-digest }}
        run: |
          echo "$KUBECONFIG_DATA" | base64 -d > kubeconfig.yml
          export KUBECONFIG=kubeconfig.yml
          # Canary deploy: 10% do tráfego primeiro
          kubectl set image deployment/api-canary \
            api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${IMAGE_DIGEST} \
            --namespace=production
          kubectl rollout status deployment/api-canary --namespace=production --timeout=300s
          echo "Canary estável. Promovendo para 100%..."
          kubectl set image deployment/api \
            api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}@${IMAGE_DIGEST} \
            --namespace=production
          kubectl rollout status deployment/api --namespace=production --timeout=300s

      - name: Notify Slack on success
        if: success()
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {
              "text": "✅ Deploy em produção concluído",
              "blocks": [{"type":"section","text":{"type":"mrkdwn","text":"*Deploy bem-sucedido*\nCommit: `${{ github.sha }}`\nAuthor: ${{ github.actor }}"}}]
            }
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

      - name: Rollback on failure
        if: failure()
        run: |
          export KUBECONFIG=kubeconfig.yml
          kubectl rollout undo deployment/api --namespace=production
          echo "⚠️ Deploy falhou — rollback executado automaticamente"

IA na Geração de Changelogs

Um dos usos mais imediatos de IA em pipelines é a geração automática de changelogs. O pipeline pode chamar um LLM com todos os commits desde a última tag e gerar um changelog legível para humanos, categorizado por tipo de mudança.

💡
Otimização de pipelines com histórico

LLMs treinados com histórico de builds podem identificar: quais jobs mais frequentemente falham por flakiness, qual ordem de execução minimiza o tempo total de pipeline, quais testes têm maior retorno por tempo de execução. Ferramentas como Trunk.io e Buildkite já incorporam análise de IA para essas otimizações.

3

DevSecOps com IA — Segurança no Pipeline

O DevSecOps move a análise de segurança para o início do ciclo de desenvolvimento — "shift left". Com IA, esse processo vai além das regras estáticas (SAST clássico) para análise semântica que entende contexto, fluxo de dados e intenção do código.

Análise de Vulnerabilidades com LLM

PYTHON
llm_security_scanner.py
"""
Scanner de segurança usando LLM para análise semântica de vulnerabilidades.
Complementa ferramentas estáticas (Bandit, Semgrep) com raciocínio contextual.

Instalação: pip install openai gitpython click rich

Uso:
    # Analisa um arquivo específico
    python llm_security_scanner.py scan --file src/auth.py

    # Analisa um PR (diff do git)
    python llm_security_scanner.py scan-pr --base main --head feature/auth

    # Relatório JSON para integração com CI
    python llm_security_scanner.py scan --file src/auth.py --output json
"""
import json
import sys
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from typing import Literal
from openai import OpenAI
from rich.console import Console
from rich.table import Table
from rich.panel import Panel

client = OpenAI()
console = Console()

SECURITY_SYSTEM_PROMPT = """
Você é um especialista em segurança de aplicações com profundo conhecimento em:
- OWASP Top 10 (2021 e 2023)
- CWE (Common Weakness Enumeration)
- OWASP ASVS (Application Security Verification Standard)
- Boas práticas de segurança por linguagem (Python, JavaScript, Go, Java)

Ao analisar código, você deve:
1. Identificar vulnerabilidades REAIS com contexto — não falsos positivos de ferramentas estáticas
2. Explicar o impacto potencial de cada vulnerabilidade
3. Fornecer código de correção concreto
4. Priorizar por severidade (Critical > High > Medium > Low > Info)

Você responde EXCLUSIVAMENTE em JSON válido, sem texto adicional.
"""

SECURITY_ANALYSIS_SCHEMA = {
    "type": "json_schema",
    "json_schema": {
        "name": "security_analysis",
        "strict": True,
        "schema": {
            "type": "object",
            "properties": {
                "summary": {"type": "string"},
                "risk_score": {"type": "integer", "minimum": 0, "maximum": 100},
                "vulnerabilities": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "string"},
                            "severity": {"type": "string", "enum": ["critical", "high", "medium", "low", "info"]},
                            "cwe": {"type": "string"},
                            "title": {"type": "string"},
                            "description": {"type": "string"},
                            "line_hint": {"type": "string"},
                            "impact": {"type": "string"},
                            "recommendation": {"type": "string"},
                            "fix_code": {"type": "string"}
                        },
                        "required": ["id", "severity", "title", "description", "recommendation"],
                        "additionalProperties": False
                    }
                }
            },
            "required": ["summary", "risk_score", "vulnerabilities"],
            "additionalProperties": False
        }
    }
}


@dataclass
class ScanResult:
    file_path: str
    summary: str
    risk_score: int
    vulnerabilities: list[dict]

    @property
    def critical_count(self) -> int:
        return sum(1 for v in self.vulnerabilities if v["severity"] == "critical")

    @property
    def high_count(self) -> int:
        return sum(1 for v in self.vulnerabilities if v["severity"] == "high")

    @property
    def has_blocking_issues(self) -> bool:
        """Retorna True se há issues que deveriam bloquear o merge."""
        return self.critical_count > 0 or self.high_count > 0


def analyze_file_security(file_path: str) -> ScanResult:
    """
    Analisa um arquivo de código em busca de vulnerabilidades usando LLM.
    """
    code = Path(file_path).read_text(encoding="utf-8")
    extension = Path(file_path).suffix.lstrip(".")

    # Limita o tamanho para respeitar context window (em produção, use chunking)
    max_chars = 12000
    if len(code) > max_chars:
        console.print(f"[yellow]⚠ Arquivo grande: analisando primeiros {max_chars} caracteres[/yellow]")
        code = code[:max_chars] + "\n\n# [... arquivo truncado para análise ...]"

    user_message = f"""
Analise o seguinte código {extension.upper()} em busca de vulnerabilidades de segurança.

Arquivo: {file_path}

```{extension}
{code}
```

Forneça uma análise completa incluindo:
- Vulnerabilidades encontradas com severidade, CWE, impacto e código de correção
- Risk score de 0 (sem riscos) a 100 (múltiplas vulnerabilidades críticas)
- Resumo executivo da análise
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SECURITY_SYSTEM_PROMPT},
            {"role": "user", "content": user_message}
        ],
        response_format=SECURITY_ANALYSIS_SCHEMA,
        temperature=0,   # Determinístico para análise de segurança
        max_tokens=4000
    )

    result = json.loads(response.choices[0].message.content)
    return ScanResult(
        file_path=file_path,
        summary=result["summary"],
        risk_score=result["risk_score"],
        vulnerabilities=result["vulnerabilities"]
    )


def analyze_git_diff(base_branch: str = "main", head_branch: str = "HEAD") -> ScanResult:
    """
    Analisa apenas o diff do git (ideal para análise de PRs no CI).
    Mais eficiente que escanear arquivos inteiros.
    """
    diff = subprocess.check_output(
        ["git", "diff", f"{base_branch}...{head_branch}", "--", "*.py", "*.js", "*.ts", "*.go"],
        text=True
    )

    if not diff.strip():
        return ScanResult(
            file_path="",
            summary="Nenhuma mudança detectada no diff.",
            risk_score=0,
            vulnerabilities=[]
        )

    user_message = f"""
Analise o seguinte git diff em busca de vulnerabilidades de segurança INTRODUZIDAS pelas mudanças.

Foque apenas em código ADICIONADO (linhas com +), não em código removido.

```diff
{diff[:10000]}
```
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SECURITY_SYSTEM_PROMPT},
            {"role": "user", "content": user_message}
        ],
        response_format=SECURITY_ANALYSIS_SCHEMA,
        temperature=0
    )

    result = json.loads(response.choices[0].message.content)
    return ScanResult(
        file_path=f"git diff {base_branch}...{head_branch}",
        summary=result["summary"],
        risk_score=result["risk_score"],
        vulnerabilities=result["vulnerabilities"]
    )


def print_report(result: ScanResult) -> None:
    """Imprime relatório formatado com Rich."""
    # Header
    risk_color = "red" if result.risk_score >= 70 else "yellow" if result.risk_score >= 40 else "green"
    console.print(Panel(
        f"[bold]Arquivo:[/bold] {result.file_path}\n"
        f"[bold]Risk Score:[/bold] [{risk_color}]{result.risk_score}/100[/{risk_color}]\n"
        f"[bold]Resumo:[/bold] {result.summary}",
        title="🛡️ Relatório de Segurança — LLM Scanner",
        border_style=risk_color
    ))

    if not result.vulnerabilities:
        console.print("[green]✅ Nenhuma vulnerabilidade encontrada.[/green]")
        return

    # Tabela de vulnerabilidades
    table = Table(show_header=True, header_style="bold cyan")
    table.add_column("Severidade", width=12)
    table.add_column("CWE", width=12)
    table.add_column("Título", width=40)
    table.add_column("Linha", width=20)

    severity_colors = {
        "critical": "bold red", "high": "red",
        "medium": "yellow", "low": "cyan", "info": "white"
    }

    for vuln in sorted(result.vulnerabilities,
                       key=lambda v: ["critical","high","medium","low","info"].index(v["severity"])):
        color = severity_colors.get(vuln["severity"], "white")
        table.add_row(
            f"[{color}]{vuln['severity'].upper()}[/{color}]",
            vuln.get("cwe", "N/A"),
            vuln["title"],
            vuln.get("line_hint", "N/A")
        )

    console.print(table)

    # Detalhes de cada vulnerabilidade crítica/alta
    for vuln in result.vulnerabilities:
        if vuln["severity"] in ("critical", "high"):
            console.print(f"\n[bold red]{'='*60}[/bold red]")
            console.print(f"[bold]🚨 {vuln['severity'].upper()}: {vuln['title']}[/bold]")
            console.print(f"[dim]{vuln.get('cwe', '')}[/dim]")
            console.print(f"\n{vuln['description']}")
            console.print(f"\n[bold]Impacto:[/bold] {vuln.get('impact', 'N/A')}")
            console.print(f"\n[bold]Recomendação:[/bold] {vuln['recommendation']}")
            if vuln.get("fix_code"):
                console.print(f"\n[bold]Código corrigido:[/bold]")
                console.print(f"[green]{vuln['fix_code']}[/green]")


# ── Integração com CI: exit code 1 se há issues críticas ──────────────
def run_ci_scan(file_path: str) -> int:
    """Retorna exit code para uso em pipelines CI."""
    result = analyze_file_security(file_path)
    print_report(result)

    if result.has_blocking_issues:
        console.print(f"\n[bold red]❌ FALHA DE SEGURANÇA: {result.critical_count} crítico(s), {result.high_count} alto(s)[/bold red]")
        console.print("[red]Pipeline bloqueado. Corrija as vulnerabilidades antes de continuar.[/red]")
        return 1  # exit code 1 = falha no CI

    console.print(f"\n[green]✅ Análise de segurança aprovada (Risk Score: {result.risk_score}/100)[/green]")
    return 0


if __name__ == "__main__":
    import sys
    file_to_scan = sys.argv[1] if len(sys.argv) > 1 else "src/auth.py"
    exit_code = run_ci_scan(file_to_scan)
    sys.exit(exit_code)
▶ Integrando o scanner ao GitHub Actions
  1. Adicione o script ao repositório em scripts/security_scan.py
  2. Adicione OPENAI_API_KEY como GitHub Secret no repositório
  3. No workflow, adicione um step: python scripts/security_scan.py --file $CHANGED_FILES
  4. Use continue-on-error: false para que o pipeline quebre em vulnerabilidades críticas
  5. Configure o scanner para analisar apenas o diff do PR (mais rápido e focado): python scripts/security_scan.py --mode diff --base main
4

Monitoramento e Observabilidade Inteligente

A observabilidade tradicional depende de thresholds manuais: "alerte quando CPU > 80%". Com IA, o sistema aprende o comportamento normal e detecta anomalias — desvios estatisticamente significativos do baseline, independente de um threshold fixo. O resultado: menos alertas falsos, mais detecções relevantes.

Os Três Pilares da Observabilidade com IA

📝
Logs Inteligentes
LLMs analisam logs em linguagem natural, agrupam erros semanticamente, identificam padrões de falha e geram resumos acionáveis sem regex manual.
IA Aplicada
📊
Métricas com Detecção de Anomalias
Modelos de séries temporais (Isolation Forest, Prophet) detectam desvios do baseline. IA correlaciona métricas de diferentes sistemas para root cause analysis.
IA Aplicada
🔍
Traces Contextualizados
IA analisa traces de latência e identifica gargalos: "o P99 aumentou porque esta query específica passou a fazer full table scan após o deploy das 14h".
IA Aplicada

Detector de Anomalias com IA

PYTHON
anomaly_detector.py
"""
Detector de anomalias em métricas operacionais usando IA.
Combina análise estatística (Z-score) com interpretação de LLM.

Instalação: pip install openai numpy scipy prometheus-client

Uso:
    # Analisa métricas do Prometheus
    python anomaly_detector.py --prometheus http://localhost:9090 --window 1h

    # Analisa métricas de um arquivo CSV (para testes)
    python anomaly_detector.py --input metrics.csv
"""
import json
import statistics
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
import httpx
from openai import OpenAI

client = OpenAI()


@dataclass
class MetricSample:
    timestamp: datetime
    value: float
    labels: dict = field(default_factory=dict)


@dataclass
class Anomaly:
    metric_name: str
    timestamp: datetime
    value: float
    expected_range: tuple[float, float]
    z_score: float
    severity: str   # "low" | "medium" | "high" | "critical"
    context: dict = field(default_factory=dict)


class StatisticalAnomalyDetector:
    """
    Detector de anomalias baseado em Z-score e desvio padrão.
    Aprende o baseline das últimas N horas e detecta desvios.
    """
    def __init__(self, z_threshold: float = 3.0, window_size: int = 60):
        self.z_threshold = z_threshold
        self.window_size = window_size  # número de amostras para baseline

    def detect(self, samples: list[MetricSample]) -> list[Anomaly]:
        """Detecta anomalias em uma série temporal de amostras."""
        if len(samples) < self.window_size:
            return []  # Dados insuficientes para baseline

        anomalies = []
        values = [s.value for s in samples]

        # Calcula baseline com janela deslizante
        for i in range(self.window_size, len(samples)):
            window = values[i - self.window_size:i]
            mean = statistics.mean(window)
            std = statistics.stdev(window) if len(window) > 1 else 0

            if std == 0:
                continue  # Sem variabilidade, não há anomalia

            current_value = values[i]
            z_score = abs((current_value - mean) / std)

            if z_score > self.z_threshold:
                # Classifica severidade por magnitude do z-score
                if z_score > 6:
                    severity = "critical"
                elif z_score > 4.5:
                    severity = "high"
                elif z_score > 3.5:
                    severity = "medium"
                else:
                    severity = "low"

                # Range esperado (95% confidence interval ≈ ±2σ)
                expected_low = mean - 2 * std
                expected_high = mean + 2 * std

                anomalies.append(Anomaly(
                    metric_name=samples[i].labels.get("__name__", "unknown"),
                    timestamp=samples[i].timestamp,
                    value=current_value,
                    expected_range=(expected_low, expected_high),
                    z_score=z_score,
                    severity=severity,
                    context={"mean": mean, "std": std, "window_size": self.window_size}
                ))

        return anomalies


def fetch_prometheus_metrics(
    prometheus_url: str,
    queries: dict[str, str],
    start: datetime,
    end: datetime,
    step: str = "1m"
) -> dict[str, list[MetricSample]]:
    """
    Busca métricas do Prometheus via API HTTP.
    Retorna um dict de {metric_name: [MetricSample]}
    """
    results = {}

    for metric_name, promql in queries.items():
        params = {
            "query": promql,
            "start": start.timestamp(),
            "end": end.timestamp(),
            "step": step
        }
        response = httpx.get(
            f"{prometheus_url}/api/v1/query_range",
            params=params,
            timeout=30.0
        )
        response.raise_for_status()
        data = response.json()

        samples = []
        for result in data["data"]["result"]:
            labels = result["metric"]
            for ts, val in result["values"]:
                samples.append(MetricSample(
                    timestamp=datetime.fromtimestamp(float(ts)),
                    value=float(val),
                    labels={"__name__": metric_name, **labels}
                ))

        if samples:
            results[metric_name] = samples

    return results


def analyze_anomalies_with_llm(
    anomalies: list[Anomaly],
    recent_deploys: list[dict] = None,
    service_name: str = "api-service"
) -> dict:
    """
    Usa LLM para interpretar anomalias detectadas estatisticamente.
    Correlaciona com deploys, horários e padrões conhecidos.
    """
    if not anomalies:
        return {"assessment": "Nenhuma anomalia detectada. Sistema operando normalmente.", "actions": []}

    # Prepara contexto para o LLM
    anomaly_summary = []
    for a in anomalies:
        anomaly_summary.append({
            "metric": a.metric_name,
            "time": a.timestamp.isoformat(),
            "value": round(a.value, 4),
            "expected_range": [round(a.expected_range[0], 4), round(a.expected_range[1], 4)],
            "z_score": round(a.z_score, 2),
            "severity": a.severity
        })

    context = {
        "service": service_name,
        "anomalies_detected": len(anomalies),
        "critical_count": sum(1 for a in anomalies if a.severity == "critical"),
        "high_count": sum(1 for a in anomalies if a.severity == "high"),
        "anomalies": anomaly_summary,
        "recent_deploys": recent_deploys or [],
        "analysis_timestamp": datetime.utcnow().isoformat()
    }

    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": """Você é um SRE sênior especializado em análise de anomalias em sistemas distribuídos.

Ao analisar anomalias de métricas, você:
1. Identifica possíveis causas raiz (deploys recentes, padrões sazonais, falhas em dependências)
2. Avalia se as anomalias são correlacionadas (mesmo incidente) ou independentes
3. Prioriza ações com base em impacto ao usuário
4. Sugere runbooks e próximos passos concretos

Seja conciso e acionável. Responda em JSON."""
            },
            {
                "role": "user",
                "content": f"""Analise as seguintes anomalias detectadas no serviço {service_name}:

{json.dumps(context, ensure_ascii=False, indent=2)}

Responda com:
1. Avaliação geral da situação
2. Possível causa raiz (se identificável)
3. Correlação entre anomalias
4. Ações recomendadas (ordenadas por prioridade)
5. Se é necessário pager o on-call agora

Formato JSON: {{"assessment": "...", "root_cause": "...", "correlation": "...", "actions": [...], "page_oncall": true/false, "urgency": "low|medium|high|critical"}}"""
            }
        ],
        response_format={"type": "json_object"},
        temperature=0
    )

    return json.loads(response.choices[0].message.content)


# ── Pipeline de análise completo ───────────────────────────────────────
def run_anomaly_pipeline(
    prometheus_url: str = "http://localhost:9090",
    service: str = "api-service"
):
    """Executa o pipeline completo de detecção e análise de anomalias."""

    print(f"🔍 Analisando métricas de {service}...")

    end = datetime.utcnow()
    start = end - timedelta(hours=2)

    # Queries PromQL relevantes para a maioria dos serviços
    queries = {
        "request_rate":    f'rate(http_requests_total{{service="{service}"}}[5m])',
        "error_rate":      f'rate(http_requests_total{{service="{service}",status=~"5.."}}[5m])',
        "latency_p99":     f'histogram_quantile(0.99, rate(http_duration_seconds_bucket{{service="{service}"}}[5m]))',
        "cpu_usage":       f'rate(container_cpu_usage_seconds_total{{pod=~"{service}-.*"}}[5m])',
        "memory_usage":    f'container_memory_usage_bytes{{pod=~"{service}-.*"}}',
    }

    try:
        metrics = fetch_prometheus_metrics(prometheus_url, queries, start, end)
    except Exception as e:
        print(f"⚠️  Usando dados simulados (Prometheus indisponível: {e})")
        # Dados simulados para demonstração
        import random
        now = datetime.utcnow()
        metrics = {
            "error_rate": [
                MetricSample(now - timedelta(minutes=i), random.gauss(0.01, 0.002), {"__name__": "error_rate"})
                for i in range(120, 0, -1)
            ] + [
                # Spike de erro nos últimos 10 minutos
                MetricSample(now - timedelta(minutes=i), random.gauss(0.15, 0.01), {"__name__": "error_rate"})
                for i in range(10, 0, -1)
            ]
        }

    # Detecta anomalias estatisticamente
    detector = StatisticalAnomalyDetector(z_threshold=3.0, window_size=30)
    all_anomalies = []

    for metric_name, samples in metrics.items():
        anomalies = detector.detect(samples)
        all_anomalies.extend(anomalies)
        if anomalies:
            print(f"  ⚠️  {len(anomalies)} anomalia(s) em {metric_name}")

    print(f"\n📊 Total de anomalias detectadas: {len(all_anomalies)}")

    # Simula histórico de deploys recentes (em produção, busque do deployment tracker)
    recent_deploys = [
        {"time": (datetime.utcnow() - timedelta(minutes=25)).isoformat(),
         "version": "v2.4.1", "author": "ci-bot", "status": "success"}
    ]

    # Análise com LLM
    print("\n🤖 Analisando com IA...")
    analysis = analyze_anomalies_with_llm(all_anomalies, recent_deploys, service)

    # Resultado
    print("\n" + "="*60)
    print(f"ANÁLISE DE ANOMALIAS — {service}")
    print("="*60)
    print(json.dumps(analysis, ensure_ascii=False, indent=2))

    if analysis.get("page_oncall") and analysis.get("urgency") in ("high", "critical"):
        print("\n🚨 AÇÃO NECESSÁRIA: Notificar on-call imediatamente!")
        # Em produção: enviar para PagerDuty, Opsgenie, etc.

    return analysis


if __name__ == "__main__":
    run_anomaly_pipeline()
5

Resposta a Incidentes Assistida por IA

Incidentes são situações de alto stress onde decisões precisam ser tomadas rapidamente com informação incompleta. A IA atua como o "segundo cérebro" do on-call: processa grandes volumes de logs, correlaciona eventos e sugere ações enquanto o engenheiro foca nas decisões estratégicas.

Fluxo de Resposta Assistida por IA

🚨
Alerta Acionado

PagerDuty/Opsgenie aciona o on-call. Simultaneamente, o agente de IA começa a coletar contexto: logs dos últimos 30min, métricas afetadas, deploys recentes, incidents similares no histórico.

📋
Resumo Automático Gerado

Antes mesmo do engenheiro abrir o laptop, o agente posta no canal de incidentes um resumo: "Degradação de latência no serviço de checkout. Error rate subiu de 0.3% para 12% às 03:47. Último deploy: v3.2 às 03:30. Possivelmente relacionado ao índice de DB removido."

🔧
Sugestões de Runbook

O agente recupera runbooks relevantes do histórico de incidentes similares e os adapta ao contexto atual. Sugere passos concretos com comandos prontos para executar.

💬
ChatOps Interativo

O on-call interage com o agente via Slack: "o que mais mudou nas últimas 2 horas?" / "mostra os top 10 erros nos logs" / "qual o impacto estimado em usuários?". O agente executa as consultas e responde em segundos.

📝
Postmortem Automático

Após resolução, o agente gera um rascunho de postmortem com timeline, causa raiz identificada, impacto calculado e action items sugeridos — prontos para revisão humana.

Agente de Resposta a Incidentes com Tool Use

PYTHON
incident_response_agent.py
"""
Agente de resposta a incidentes com tool use.
O agente usa ferramentas reais para coletar contexto e sugerir ações.

Instalação: pip install openai httpx

Ferramentas disponíveis:
- query_logs: Busca logs no Elasticsearch/Loki
- get_metrics: Consulta métricas do Prometheus
- list_recent_deploys: Lista deploys recentes do deployment tracker
- search_runbooks: Busca runbooks similares no Confluence/Notion
- notify_slack: Envia mensagens para canais do Slack
- generate_postmortem: Gera rascunho de postmortem
"""
import json
from openai import OpenAI
from datetime import datetime, timedelta

client = OpenAI()

# ── Definição das tools do agente ──────────────────────────────────────
INCIDENT_TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "query_logs",
            "description": "Busca logs de um serviço em um período de tempo. Retorna as N entradas mais relevantes com contagem de erros.",
            "parameters": {
                "type": "object",
                "properties": {
                    "service": {"type": "string", "description": "Nome do serviço"},
                    "level": {"type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG"], "description": "Nível mínimo de log"},
                    "minutes_ago": {"type": "integer", "description": "Quantos minutos atrás buscar (default: 30)", "default": 30},
                    "limit": {"type": "integer", "description": "Número máximo de entradas", "default": 20}
                },
                "required": ["service"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_metrics_summary",
            "description": "Retorna um resumo das principais métricas de um serviço (error rate, latência, throughput) para os últimos N minutos.",
            "parameters": {
                "type": "object",
                "properties": {
                    "service": {"type": "string"},
                    "minutes_ago": {"type": "integer", "default": 60}
                },
                "required": ["service"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "list_recent_deploys",
            "description": "Lista os deploys mais recentes de um serviço ou de todos os serviços.",
            "parameters": {
                "type": "object",
                "properties": {
                    "service": {"type": "string", "description": "Nome do serviço (opcional — se omitido, retorna todos)"},
                    "limit": {"type": "integer", "default": 5}
                }
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_runbooks",
            "description": "Busca runbooks de resposta a incidentes similares com base em palavras-chave.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "Palavras-chave do problema, ex: 'latência alta checkout banco'"},
                    "limit": {"type": "integer", "default": 3}
                },
                "required": ["query"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "generate_postmortem_draft",
            "description": "Gera um rascunho de postmortem baseado nos dados coletados do incidente.",
            "parameters": {
                "type": "object",
                "properties": {
                    "incident_summary": {"type": "string"},
                    "timeline": {"type": "array", "items": {"type": "string"}},
                    "root_cause": {"type": "string"},
                    "impact_description": {"type": "string"},
                    "action_items": {"type": "array", "items": {"type": "string"}}
                },
                "required": ["incident_summary", "root_cause"]
            }
        }
    }
]

# ── Implementações simuladas das tools ─────────────────────────────────
def query_logs(service: str, level: str = "ERROR", minutes_ago: int = 30, limit: int = 20) -> dict:
    """Simula query de logs (em produção: Elasticsearch/Loki/CloudWatch)."""
    return {
        "service": service,
        "period": f"últimos {minutes_ago} minutos",
        "total_errors": 847,
        "sample_logs": [
            {"time": "03:52:14", "level": "ERROR", "message": "Database connection timeout after 30s", "count": 312},
            {"time": "03:52:15", "level": "ERROR", "message": "Failed to acquire connection from pool (pool_size=10, waiting=45)", "count": 289},
            {"time": "03:52:17", "level": "WARN",  "message": "Circuit breaker opened for db-primary", "count": 1},
            {"time": "03:53:01", "level": "ERROR", "message": "Transaction rollback: connection lost", "count": 246},
        ]
    }

def get_metrics_summary(service: str, minutes_ago: int = 60) -> dict:
    return {
        "service": service,
        "period_minutes": minutes_ago,
        "current": {"error_rate": "11.8%", "p99_latency_ms": 8420, "throughput_rps": 847, "availability": "88.2%"},
        "baseline_30d": {"error_rate": "0.3%", "p99_latency_ms": 180, "throughput_rps": 920, "availability": "99.97%"},
        "trend": "Degradação contínua desde 03:47. SLO de disponibilidade (99.9%) violado."
    }

def list_recent_deploys(service: str = None, limit: int = 5) -> dict:
    return {
        "deploys": [
            {"time": "03:30:00", "service": "checkout-api", "version": "v3.2.0", "author": "alice", "status": "success",
             "changes": ["Atualiza pool de conexões do banco", "Remove índice idx_orders_legacy"]},
            {"time": "02:15:00", "service": "payment-service", "version": "v1.8.3", "author": "bob", "status": "success",
             "changes": ["Fix: timeout em retry de pagamento"]},
            {"time": "01:00:00", "service": "user-service", "version": "v2.1.0", "author": "carol", "status": "success",
             "changes": ["Adiciona campo `last_seen` na tabela users"]}
        ]
    }

def search_runbooks(query: str, limit: int = 3) -> dict:
    return {
        "runbooks": [
            {"title": "Database Connection Pool Exhaustion", "url": "https://wiki.empresa.com/runbooks/db-pool",
             "relevance": 0.95, "steps": ["Verificar pool size atual", "Aumentar pool temporariamente", "Identificar queries longas"]},
            {"title": "High Latency Checkout Service", "url": "https://wiki.empresa.com/runbooks/checkout-latency",
             "relevance": 0.87, "steps": ["Verificar dependências externas", "Analisar slow queries", "Avaliar rollback"]},
        ]
    }

def generate_postmortem_draft(incident_summary: str, root_cause: str, timeline: list = None,
                               impact_description: str = None, action_items: list = None) -> dict:
    postmortem = {
        "title": f"Postmortem — {incident_summary[:60]}",
        "date": datetime.utcnow().strftime("%Y-%m-%d"),
        "severity": "SEV-2",
        "status": "Draft",
        "summary": incident_summary,
        "root_cause": root_cause,
        "timeline": timeline or [],
        "impact": impact_description or "A definir",
        "action_items": action_items or [],
        "document_url": "https://notion.empresa.com/postmortems/auto-draft-001"
    }
    return postmortem

TOOL_FUNCTIONS = {
    "query_logs": query_logs,
    "get_metrics_summary": get_metrics_summary,
    "list_recent_deploys": list_recent_deploys,
    "search_runbooks": search_runbooks,
    "generate_postmortem_draft": generate_postmortem_draft,
}

# ── Agente principal ───────────────────────────────────────────────────
def run_incident_agent(alert: dict) -> str:
    """
    Agente de resposta a incidentes que usa ferramentas para investigar
    e gerar um plano de ação estruturado.
    """
    print(f"\n🚨 Incidente recebido: {alert['title']}")
    print(f"   Serviço: {alert['service']} | Severidade: {alert['severity']}\n")

    messages = [
        {
            "role": "system",
            "content": """Você é um SRE sênior especializado em resposta a incidentes.
Ao receber um alerta, sua tarefa é:
1. Coletar contexto completo (logs, métricas, deploys recentes)
2. Identificar a causa raiz provável
3. Buscar runbooks relevantes
4. Recomendar ações imediatas ordenadas por prioridade
5. Gerar um rascunho de postmortem

Seja metódico, conciso e acionável. Use as ferramentas disponíveis para coletar dados reais."""
        },
        {
            "role": "user",
            "content": f"""Incidente ativo:
Título: {alert['title']}
Serviço: {alert['service']}
Severidade: {alert['severity']}
Alerta disparado: {alert['alert_time']}
Descrição: {alert['description']}

Investigue o incidente e forneça um plano de ação completo."""
        }
    ]

    # Loop agentico
    max_iterations = 8
    for i in range(max_iterations):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=INCIDENT_TOOLS,
            tool_choice="auto"
        )

        message = response.choices[0].message

        if not message.tool_calls:
            # Agente terminou — retorna análise final
            return message.content

        messages.append(message)

        # Executa tool calls
        for tc in message.tool_calls:
            func_name = tc.function.name
            args = json.loads(tc.function.arguments)
            print(f"  🔧 [{i+1}] {func_name}({', '.join(f'{k}={v}' for k,v in args.items())})")

            func = TOOL_FUNCTIONS.get(func_name)
            result = func(**args) if func else {"error": f"Tool {func_name} não encontrada"}

            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": json.dumps(result, ensure_ascii=False)
            })

    return "Limite de iterações atingido."


# ── Exemplo de uso ─────────────────────────────────────────────────────
if __name__ == "__main__":
    alert = {
        "title": "CRÍTICO: Alta taxa de erro no checkout-api",
        "service": "checkout-api",
        "severity": "SEV-2",
        "alert_time": "2025-04-22 03:47:00 UTC",
        "description": "Error rate subiu de 0.3% para 11.8%. SLO de disponibilidade violado. ~2.100 usuários afetados."
    }

    analysis = run_incident_agent(alert)
    print("\n" + "="*60)
    print("ANÁLISE DO AGENTE:")
    print("="*60)
    print(analysis)

Geração Automática de Postmortem

PYTHON
postmortem_generator.py
"""
Gerador automático de postmortem a partir de dados do incidente.
Integra com Slack, PagerDuty e sistemas de ticket para coletar dados.

Uso: python postmortem_generator.py --incident-id INC-2025-0422
"""
import json
from openai import OpenAI
from dataclasses import dataclass, field
from datetime import datetime

client = OpenAI()

@dataclass
class IncidentData:
    """Dados coletados durante o incidente para geração do postmortem."""
    incident_id: str
    title: str
    service: str
    severity: str
    started_at: str
    resolved_at: str
    duration_minutes: int
    affected_users: int
    slack_thread: list[dict]      # Mensagens do canal de incidente
    alert_timeline: list[dict]    # Timeline de alertas disparados
    actions_taken: list[str]      # Comandos/ações executadas
    root_cause: str               # Causa raiz identificada pelo on-call
    resolution: str               # Como foi resolvido
    metrics_during_incident: dict

POSTMORTEM_PROMPT = """
Você é um especialista em SRE que escreve postmortems de alta qualidade.

Baseado nos dados do incidente fornecidos, gere um postmortem profissional e blameless
seguindo o formato Google SRE.

O postmortem deve incluir:

## Resumo Executivo
- O que aconteceu em 2-3 frases
- Impacto nos usuários e no negócio
- Duração total

## Causa Raiz
- Causa raiz técnica (o "why" mais profundo)
- Fatores contribuintes

## Timeline Detalhado
- Reconstituição cronológica dos eventos
- Com horários precisos
- Incluindo sinais que foram ignorados

## Impacto
- Métricas afetadas e magnitude
- Estimativa de usuários impactados
- Impacto financeiro (se aplicável)

## O que deu certo
- Detecção e resposta eficaz
- Sistemas de proteção que funcionaram

## O que poderia ser melhor
- Blameless — foca em processos, não pessoas
- Gaps de monitoramento
- Dificuldades na investigação

## Action Items
- Itens preventivos com owner e prazo
- Itens de detecção (melhorar alertas)
- Itens de resposta (melhorar runbooks)
- Formato: [TIPO] Descrição (Owner: X, Prazo: YYYY-MM-DD)

Seja específico, técnico e construtivo. Evite linguagem vaga.
"""

def generate_postmortem(incident: IncidentData) -> str:
    """Gera postmortem completo a partir dos dados do incidente."""

    # Formata o contexto do incidente
    context = f"""
DADOS DO INCIDENTE {incident.incident_id}:

Título: {incident.title}
Serviço: {incident.service}
Severidade: {incident.severity}
Início: {incident.started_at}
Resolução: {incident.resolved_at}
Duração: {incident.duration_minutes} minutos
Usuários afetados: {incident.affected_users:,}

CAUSA RAIZ (identificada pelo on-call):
{incident.root_cause}

RESOLUÇÃO:
{incident.resolution}

TIMELINE DE ALERTAS:
{json.dumps(incident.alert_timeline, ensure_ascii=False, indent=2)}

AÇÕES TOMADAS:
{chr(10).join(f"- {action}" for action in incident.actions_taken)}

MÉTRICAS DURANTE O INCIDENTE:
{json.dumps(incident.metrics_during_incident, ensure_ascii=False, indent=2)}

RESUMO DO CANAL DE INCIDENTE (Slack):
{json.dumps(incident.slack_thread[:10], ensure_ascii=False, indent=2)}
"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": POSTMORTEM_PROMPT},
            {"role": "user", "content": context}
        ],
        temperature=0.3,
        max_tokens=3000
    )

    return response.choices[0].message.content


# ── Exemplo de execução ────────────────────────────────────────────────
if __name__ == "__main__":
    incident = IncidentData(
        incident_id="INC-2025-0422",
        title="Degradação do checkout-api por esgotamento de connection pool",
        service="checkout-api",
        severity="SEV-2",
        started_at="2025-04-22T03:47:00Z",
        resolved_at="2025-04-22T04:23:00Z",
        duration_minutes=36,
        affected_users=2140,
        root_cause="Remoção do índice idx_orders_legacy no deploy v3.2.0 causou full table scan em query de verificação de pedidos. Query passou de 8ms para 28s, esgotando o connection pool (size=10) em 4 minutos.",
        resolution="Rollback para v3.1.9 às 04:20. Connection pool recuperou em 3 minutos. Índice será recriado em janela de manutenção.",
        slack_thread=[
            {"user": "pagerduty-bot", "time": "03:47", "text": "🚨 ALERTA: error_rate > 5% em checkout-api"},
            {"user": "alice", "time": "03:49", "text": "Acknowledging. Investigando."},
            {"user": "alice", "time": "03:52", "text": "Logs mostrando DB connection timeouts. Pool esgotado."},
            {"user": "alice", "time": "03:58", "text": "Encontrado: query lenta após deploy v3.2. Removemos idx_orders_legacy."},
            {"user": "alice", "time": "04:15", "text": "Iniciando rollback para v3.1.9"},
            {"user": "alice", "time": "04:23", "text": "✅ Resolvido. Métricas normalizadas."},
        ],
        alert_timeline=[
            {"time": "03:47:00", "alert": "HighErrorRate", "threshold": "> 5%", "value": "11.8%"},
            {"time": "03:48:00", "alert": "HighP99Latency", "threshold": "> 2000ms", "value": "8420ms"},
            {"time": "03:49:00", "alert": "SLOBreach", "threshold": "availability < 99.9%", "value": "88.2%"},
        ],
        actions_taken=[
            "kubectl logs deployment/checkout-api --since=30m | grep ERROR",
            "Verificação no Grafana: spike de latência correlacionado com deploy das 03:30",
            "Análise de slow queries no PostgreSQL: EXPLAIN ANALYZE na query de verificação",
            "kubectl rollout undo deployment/checkout-api",
            "kubectl rollout status deployment/checkout-api --watch",
        ],
        metrics_during_incident={
            "error_rate_peak": "11.8%",
            "p99_latency_peak": "8420ms",
            "availability_min": "88.2%",
            "db_connections_peak": "10/10 (100%)",
            "connection_wait_time_peak": "28000ms"
        }
    )

    postmortem = generate_postmortem(incident)
    print(postmortem)

    # Em produção: salvar no Notion, Confluence ou GitHub
    with open(f"postmortem-{incident.incident_id}.md", "w", encoding="utf-8") as f:
        f.write(f"# Postmortem: {incident.title}\n\n")
        f.write(postmortem)
    print(f"\n✅ Postmortem salvo em postmortem-{incident.incident_id}.md")
6

Agentes Operacionais — Do Copiloto ao Autônomo

A distinção entre copiloto e agente é fundamental para entender onde estamos e para onde vamos na automação de operações com IA.

Copiloto vs Agente Operacional

Aspecto 🧑‍💼 Copiloto 🤖 Agente Autônomo
Iniciativa Reativo — responde quando perguntado Proativo — monitora e age sem ser solicitado
Execução Sugere ações, humano executa Executa ações diretamente (com guardrails)
Escopo Tarefa única no momento presente Objetivos multi-etapa ao longo do tempo
Exemplos ChatOps, assistente de logs, gerador de runbooks Auto-scaling proativo, auto-remediation, auto-rollback
Risco Baixo — humano valida cada ação Médio/Alto — requer blast radius limitado e circuit breakers
Adoção atual Matura — amplamente adotado Emergente — casos reais isolados, crescendo rapidamente

Casos Reais de Agentes Operacionais

📈
Auto-scaling Inteligente

Agente analisa padrões de tráfego histórico + dados de negócio (campanhas, feriados) e ajusta HPA/KEDA antes do pico, em vez de reativamente após ele.

🔄
Auto-remediation

Para problemas com playbook bem definido (pod crashloop, disco cheio, cert expirado), o agente executa a correção autonomamente e notifica o time depois.

🔙
Auto-rollback

Após deploy, o agente monitora métricas por N minutos. Se detecta degradação estatisticamente significativa, executa rollback automaticamente e aciona on-call.

🔐
Rotação de Secrets

Agente gerencia ciclo de vida de credenciais: detecta expiração iminente, solicita nova credencial, atualiza Vault/Secret Manager e reinicia pods afetados.

Estratégia de Adoção Gradual

A adoção de agentes operacionais autônomos deve seguir um caminho de incremento de autonomia com validação em cada etapa:

1️⃣
Observação (Semana 1-2)

O agente monitora e gera relatórios, mas não age. Valide se as análises são corretas antes de dar autonomia de execução.

2️⃣
Sugestão com Aprovação (Semana 3-4)

O agente propõe ações via Slack com botão "Aprovar" / "Rejeitar". O engenheiro decide. Colete dados de quantas sugestões foram aprovadas.

3️⃣
Automação de Baixo Risco (Mês 2)

Execute automaticamente apenas ações seguras e reversíveis: reiniciar pods, limpar caches, rodar queries de diagnóstico, enviar notificações.

4️⃣
Autonomia Expandida (Mês 3+)

Com histórico de decisões corretas validadas, expanda para ações de médio impacto: rollback, scaling, failover — sempre com circuit breakers e audit log.

⚠️
Princípio do Blast Radius Mínimo

Todo agente operacional deve ter blast radius explicitamente limitado: defina quais ações ele PODE executar (allowlist), não apenas o que ele não deve fazer. Use IAM/RBAC para garantir que o agente fisicamente não consiga ir além dos limites definidos — nunca confie apenas na instrução do prompt.