1

Tool Calling vs Protocolos Formais

Antes de entrar em protocolos como MCP e A2A, é fundamental compreender o mecanismo mais básico de extensão de capacidade dos LLMs: o Tool Calling (também chamado de Function Calling). É a fundação sobre a qual todos os protocolos mais sofisticados são construídos.

O que é Tool Calling

Tool calling é a capacidade nativa dos LLMs modernos (GPT-4, Claude, Gemini) de sinalizar, dentro da sua resposta, que desejam invocar uma função externa. O modelo não executa código — ele apenas descreve qual função chamar e com quais argumentos, em formato estruturado (normalmente JSON). A aplicação cliente é responsável por executar a função e devolver o resultado ao modelo para que ele continue o raciocínio.

📤
Schema Declaration

O desenvolvedor define um JSON Schema descrevendo nome, parâmetros e tipos de cada função disponível. Esse schema é enviado ao modelo junto com o prompt.

🤔
Model Decision

O LLM raciocina sobre se precisa de informação externa e, se sim, qual tool usar. Ele retorna uma mensagem do tipo tool_call com os argumentos preenchidos.

⚙️
Client Execution

A aplicação intercepta a resposta tool_call, executa a função localmente (ou via API), e devolve o resultado ao modelo como mensagem tool.

🔄
Continuation

Com o resultado da função em contexto, o modelo continua sua resposta, podendo encadear múltiplas tool calls antes de gerar a resposta final para o usuário.

Exemplo Completo: Tool Calling com OpenAI

PYTHON
tool_calling_openai.py
"""
Exemplo completo de Tool Calling com OpenAI Function Calling.
Inclui definição de schema, loop de execução e tratamento de resultados.
"""
import json
import httpx
from openai import OpenAI

client = OpenAI()  # usa OPENAI_API_KEY do ambiente

# ── Definição das Tools (JSON Schema) ──────────────────────────────────
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Retorna a temperatura atual e condições climáticas de uma cidade.",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "Nome da cidade, ex: 'São Paulo, BR'"
                    },
                    "units": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "Unidade de temperatura (padrão: celsius)"
                    }
                },
                "required": ["city"],
                "additionalProperties": False
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_flights",
            "description": "Busca voos disponíveis entre duas cidades em uma data.",
            "parameters": {
                "type": "object",
                "properties": {
                    "origin": {"type": "string", "description": "Código IATA de origem, ex: GRU"},
                    "destination": {"type": "string", "description": "Código IATA de destino, ex: LIS"},
                    "date": {"type": "string", "description": "Data no formato YYYY-MM-DD"},
                    "passengers": {"type": "integer", "description": "Número de passageiros", "default": 1}
                },
                "required": ["origin", "destination", "date"]
            }
        }
    }
]

# ── Implementação das funções reais ────────────────────────────────────
def get_weather(city: str, units: str = "celsius") -> dict:
    """
    Em produção, chamaria uma API de clima real (OpenWeatherMap, etc.).
    Aqui simulamos a resposta para fins didáticos.
    """
    return {
        "city": city,
        "temperature": 24 if units == "celsius" else 75,
        "units": units,
        "condition": "parcialmente nublado",
        "humidity": "68%",
        "wind_speed": "15 km/h"
    }

def search_flights(origin: str, destination: str, date: str, passengers: int = 1) -> dict:
    """Simula busca de voos."""
    return {
        "flights": [
            {"airline": "LATAM", "flight": "LA8065", "departure": "08:30", "arrival": "14:45", "price": 1250.00},
            {"airline": "TAP",   "flight": "TP074",  "departure": "22:00", "arrival": "11:30", "price": 980.00},
        ],
        "origin": origin,
        "destination": destination,
        "date": date,
        "passengers": passengers
    }

# ── Dispatcher: executa a tool pelo nome ───────────────────────────────
TOOL_REGISTRY = {
    "get_weather": get_weather,
    "search_flights": search_flights,
}

def execute_tool(tool_call) -> str:
    """Executa uma tool call retornada pelo modelo e retorna o resultado como string JSON."""
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    if name not in TOOL_REGISTRY:
        return json.dumps({"error": f"Tool '{name}' não encontrada."})

    result = TOOL_REGISTRY[name](**args)
    return json.dumps(result, ensure_ascii=False, indent=2)

# ── Loop principal de conversação ──────────────────────────────────────
def chat_with_tools(user_message: str) -> str:
    """
    Implementa o loop completo de tool calling:
    1. Envia mensagem ao modelo com definição das tools
    2. Verifica se modelo quer chamar alguma tool
    3. Executa as tools e devolve resultados
    4. Repete até o modelo retornar resposta final
    """
    messages = [
        {
            "role": "system",
            "content": "Você é um assistente de viagens prestativo. Use as ferramentas disponíveis quando necessário."
        },
        {"role": "user", "content": user_message}
    ]

    MAX_ITERATIONS = 10  # evita loops infinitos
    for iteration in range(MAX_ITERATIONS):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=tools,
            tool_choice="auto"   # modelo decide se usa tools
        )

        message = response.choices[0].message

        # Se o modelo não quer usar tools, retorna a resposta
        if message.tool_calls is None:
            return message.content

        # Adiciona a mensagem do assistente (com as tool_calls) ao histórico
        messages.append(message)

        # Executa cada tool call e adiciona os resultados
        for tool_call in message.tool_calls:
            print(f"  → Chamando tool: {tool_call.function.name}({tool_call.function.arguments})")
            result = execute_tool(tool_call)

            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result
            })

    return "Limite de iterações atingido."

# ── Execução ───────────────────────────────────────────────────────────
if __name__ == "__main__":
    query = "Qual o clima em Lisboa agora? Estou pensando em viajar de São Paulo no dia 15/06/2025."
    print(f"Usuário: {query}\n")
    answer = chat_with_tools(query)
    print(f"Assistente: {answer}")

Tool Calling vs Protocolos Formais (MCP/A2A)

O tool calling resolve bem o caso de um único agente consumindo ferramentas definidas na mesma aplicação. Mas quando o sistema cresce — múltiplos agentes, ferramentas desenvolvidas por times diferentes, execução distribuída — surge a necessidade de protocolos formais.

Aspecto Tool Calling (nativo) MCP (Model Context Protocol) A2A (Agent-to-Agent)
Escopo Único modelo ↔ funções locais Qualquer cliente ↔ qualquer servidor de contexto Agente orquestrador ↔ agentes especializados
Transporte Interno à API do LLM stdio, SSE ou HTTP Streaming HTTP/JSON-RPC sobre HTTPS
Descoberta Manual (developer define) Listagem dinâmica de capabilities agent.json publicado por cada agente
Reutilização Baixa (código acoplado) Alta (qualquer host MCP-compatível) Alta (qualquer agente A2A-compatível)
Multi-agent Não (requer orquestração manual) Parcial (cliente único, múltiplos servidores) Sim (agentes delegam tarefas entre si)
Estado de tarefa Sem gerenciamento Sem (stateless) submitted → working → completed/failed
💡
Analogia prática

Tool calling é como um funcionário que usa ferramentas que estão na sua mesa. MCP é como um escritório com acesso padronizado a recursos compartilhados da empresa. A2A é como contratar uma empresa terceirizada especializada para uma tarefa específica — você passa o briefing, eles entregam o resultado.

2

MCP — Model Context Protocol

O Model Context Protocol (MCP) é um padrão aberto criado pela Anthropic em novembro de 2024 para padronizar como aplicações de IA fornecem contexto a modelos de linguagem. Em vez de cada aplicação inventar sua própria integração com bancos de dados, APIs e sistemas de arquivos, o MCP define um protocolo universal — análogo ao que o LSP (Language Server Protocol) fez pelo ecossistema de editores de código.

ℹ️
Por que o MCP surgiu

Antes do MCP, cada ferramenta de IA (Claude Desktop, Cursor, Cline, etc.) precisava construir integrações customizadas para cada fonte de dados. Com MCP, basta construir um servidor MCP uma vez e ele funciona com qualquer host compatível. O ecossistema cresceu para centenas de servidores MCP públicos em poucos meses.

Os Três Primitivos do MCP

O MCP organiza as capacidades de um servidor em três categorias distintas:

🔧
Tools (Ferramentas)

Funções executáveis que o modelo pode chamar. São model-controlled — o LLM decide quando e como usá-las. Ex: execute_query(sql), send_email(to, subject, body), create_pr(title, branch).

📁
Resources (Recursos)

Dados que o modelo pode ler como contexto. São application-controlled — a aplicação decide o que expor. Ex: conteúdo de arquivos, resultados de queries, páginas web, blobs de dados.

📝
Prompts (Templates)

Templates de prompt pré-definidos e reutilizáveis expostos pelo servidor. São user-controlled — o usuário seleciona qual template usar. Ex: templates de revisão de código, análise de logs, geração de relatórios.

3

Arquitetura Cliente-Servidor MCP

O MCP segue uma arquitetura cliente-servidor rigorosa onde cada componente tem uma responsabilidade bem definida.

🖥️
MCP Host

A aplicação que o usuário usa diretamente — Claude Desktop, Cursor, Cline, Continue, etc. O host gerencia múltiplas conexões com servidores MCP e injeta o contexto no LLM.

🔗
MCP Client

Componente interno do Host que mantém uma conexão 1:1 com um servidor MCP. Responsável pelo handshake de protocolo, listagem de capabilities e despacho de mensagens JSON-RPC 2.0.

⚙️
MCP Server

Processo leve que expõe Tools, Resources e Prompts via protocolo MCP. Pode ser local (subprocess stdio) ou remoto (HTTP com SSE). Acessa os sistemas reais: bancos de dados, APIs, sistema de arquivos.

🗄️
Data Sources

Os sistemas de dados reais: PostgreSQL, GitHub API, Slack, filesystem, S3, etc. O servidor MCP é o único que sabe como interagir com esses sistemas.

Transportes Suportados

O MCP define três mecanismos de transporte, cada um adequado para cenários diferentes:

  • stdio: O cliente inicia o servidor como subprocesso e se comunica via stdin/stdout. Ideal para servidores locais (seguro, sem rede). É o mais comum para desenvolvimento.
  • SSE (Server-Sent Events): O servidor fica escutando em uma porta HTTP e o cliente se conecta via SSE para receber eventos. Adequado para servidores remotos/cloud.
  • Streamable HTTP: Novo transporte introduzido na spec 2025-03-26 que unifica requisição/resposta e streaming em um único endpoint HTTP, eliminando a necessidade de SSE separado.
⚠️
Segurança em Servidores MCP Remotos

Servidores MCP remotos requerem autenticação OAuth 2.1 conforme a spec. Nunca exponha um servidor MCP sem autenticação em produção. Para desenvolvimento local, o transporte stdio é suficientemente seguro pois não há exposição de rede.

4

Servidores MCP Existentes e Docker MCP Toolkit

O ecossistema de servidores MCP cresceu rapidamente. A Anthropic mantém o repositório oficial modelcontextprotocol/servers no GitHub com servidores de referência, e a comunidade desenvolveu centenas de integrações adicionais.

Servidores Oficiais (Reference Servers)

💾
filesystem

Leitura e escrita de arquivos locais com controle granular de permissões. Expõe tools como read_file, write_file, list_directory.

🐙
github

Interação completa com GitHub: criar issues, abrir PRs, buscar repos, gerenciar branches, fazer code reviews.

🐘
postgres

Executa queries SQL em bancos PostgreSQL. O schema do banco é exposto como Resource para que o modelo entenda a estrutura de dados.

🌐
fetch

Faz requisições HTTP e retorna o conteúdo de páginas web. Essencial para agentes que precisam pesquisar na internet.

💬
slack

Envia mensagens, lê canais, busca histórico de conversas no Slack. Permite agentes de automação corporativa.

🗃️
sqlite

Gerencia bancos SQLite locais: criar tabelas, inserir dados, executar queries analíticas. Ótimo para prototipagem.

Docker MCP Toolkit

A Docker lançou o Docker MCP Toolkit — uma coleção curada de servidores MCP disponíveis como imagens Docker no Docker Hub (mcp/ namespace). Isso elimina a necessidade de instalar dependências Python/Node localmente para cada servidor.

YAML
claude_desktop_config.json (usando Docker)
{
  "mcpServers": {
    "filesystem": {
      "command": "docker",
      "args": [
        "run", "-i", "--rm",
        "-v", "/Users/bruno/projects:/projects",
        "mcp/filesystem",
        "/projects"
      ]
    },
    "github": {
      "command": "docker",
      "args": [
        "run", "-i", "--rm",
        "-e", "GITHUB_PERSONAL_ACCESS_TOKEN",
        "mcp/github"
      ],
      "env": {
        "GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_seu_token_aqui"
      }
    },
    "postgres": {
      "command": "docker",
      "args": [
        "run", "-i", "--rm",
        "-e", "POSTGRES_CONNECTION_STRING",
        "mcp/postgres"
      ],
      "env": {
        "POSTGRES_CONNECTION_STRING": "postgresql://user:pass@localhost:5432/mydb"
      }
    },
    "brave-search": {
      "command": "docker",
      "args": [
        "run", "-i", "--rm",
        "-e", "BRAVE_API_KEY",
        "mcp/brave-search"
      ],
      "env": {
        "BRAVE_API_KEY": "BSA_seu_api_key"
      }
    }
  }
}
▶ Configurando Claude Desktop com Docker MCP
  1. Instale o Docker Desktop e certifique-se de que está rodando
  2. Abra o arquivo de configuração: ~/Library/Application Support/Claude/claude_desktop_config.json (macOS) ou %APPDATA%\Claude\claude_desktop_config.json (Windows)
  3. Adicione os servidores MCP desejados no formato acima
  4. Reinicie o Claude Desktop — as imagens Docker serão baixadas automaticamente na primeira execução
  5. Verifique no menu de configurações do Claude Desktop que os servidores aparecem como conectados
5

Desenvolvimento de Servidores MCP com FastMCP

O FastMCP é a forma recomendada de desenvolver servidores MCP em Python. Ele fornece uma API declarativa com decoradores, similar ao FastAPI, eliminando o boilerplate do protocolo JSON-RPC e deixando você focado na lógica de negócio.

💡
FastMCP vs MCP SDK baixo nível

O SDK oficial mcp do Python é verboso e requer implementação manual do protocolo. O FastMCP (incluído como mcp.server.fastmcp a partir da versão 1.0) fornece a mesma experiência do FastAPI — decoradores, type hints, documentação automática. Use FastMCP para 99% dos casos.

Servidor MCP Completo: Gerenciador de Tarefas

PYTHON
task_manager_mcp.py
"""
Servidor MCP completo para gerenciamento de tarefas.
Expõe Tools, Resources e Prompts conforme a spec MCP.

Instalação:
    pip install "mcp[cli]" httpx

Execução (stdio, para desenvolvimento local):
    python task_manager_mcp.py

Execução (SSE, para acesso remoto):
    python task_manager_mcp.py --transport sse --port 8080
"""
import json
from datetime import datetime
from typing import Optional
from mcp.server.fastmcp import FastMCP

# ── Inicialização do servidor ──────────────────────────────────────────
mcp = FastMCP(
    name="task-manager",
    version="1.0.0",
    description="Servidor MCP para gerenciamento de tarefas e projetos"
)

# Armazenamento em memória (em produção: use banco de dados)
tasks_db: dict[int, dict] = {}
task_id_counter = 0

# ── TOOLS ─────────────────────────────────────────────────────────────
@mcp.tool()
def create_task(
    title: str,
    description: str,
    priority: str = "medium",
    due_date: Optional[str] = None,
    tags: list[str] = []
) -> dict:
    """
    Cria uma nova tarefa no sistema.

    Args:
        title: Título curto e descritivo da tarefa
        description: Descrição detalhada do que precisa ser feito
        priority: Prioridade — 'low', 'medium', 'high', 'critical'
        due_date: Data de vencimento no formato ISO 8601 (YYYY-MM-DD)
        tags: Lista de tags para categorização

    Returns:
        Dicionário com os dados da tarefa criada, incluindo o ID gerado
    """
    global task_id_counter
    task_id_counter += 1

    valid_priorities = {"low", "medium", "high", "critical"}
    if priority not in valid_priorities:
        return {"error": f"Prioridade inválida. Use: {valid_priorities}"}

    task = {
        "id": task_id_counter,
        "title": title,
        "description": description,
        "priority": priority,
        "status": "todo",
        "due_date": due_date,
        "tags": tags,
        "created_at": datetime.utcnow().isoformat(),
        "updated_at": datetime.utcnow().isoformat()
    }
    tasks_db[task_id_counter] = task
    return {"success": True, "task": task}


@mcp.tool()
def list_tasks(
    status: Optional[str] = None,
    priority: Optional[str] = None,
    tag: Optional[str] = None
) -> dict:
    """
    Lista tarefas com filtros opcionais.

    Args:
        status: Filtrar por status — 'todo', 'in_progress', 'done'
        priority: Filtrar por prioridade — 'low', 'medium', 'high', 'critical'
        tag: Filtrar por tag específica

    Returns:
        Lista de tarefas que correspondem aos filtros
    """
    tasks = list(tasks_db.values())

    if status:
        tasks = [t for t in tasks if t["status"] == status]
    if priority:
        tasks = [t for t in tasks if t["priority"] == priority]
    if tag:
        tasks = [t for t in tasks if tag in t["tags"]]

    # Ordena por prioridade (critical > high > medium > low)
    priority_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
    tasks.sort(key=lambda t: priority_order.get(t["priority"], 99))

    return {
        "total": len(tasks),
        "tasks": tasks
    }


@mcp.tool()
def update_task_status(task_id: int, new_status: str, note: Optional[str] = None) -> dict:
    """
    Atualiza o status de uma tarefa existente.

    Args:
        task_id: ID numérico da tarefa a ser atualizada
        new_status: Novo status — 'todo', 'in_progress', 'blocked', 'done', 'cancelled'
        note: Nota opcional sobre a mudança de status

    Returns:
        Tarefa atualizada ou mensagem de erro
    """
    if task_id not in tasks_db:
        return {"error": f"Tarefa {task_id} não encontrada."}

    valid_statuses = {"todo", "in_progress", "blocked", "done", "cancelled"}
    if new_status not in valid_statuses:
        return {"error": f"Status inválido. Use: {valid_statuses}"}

    tasks_db[task_id]["status"] = new_status
    tasks_db[task_id]["updated_at"] = datetime.utcnow().isoformat()
    if note:
        tasks_db[task_id]["last_note"] = note

    return {"success": True, "task": tasks_db[task_id]}


@mcp.tool()
def get_project_summary() -> dict:
    """
    Gera um resumo analítico do estado atual do projeto.

    Returns:
        Métricas agregadas: total por status, por prioridade, tarefas atrasadas
    """
    all_tasks = list(tasks_db.values())
    today = datetime.utcnow().date().isoformat()

    summary = {
        "total_tasks": len(all_tasks),
        "by_status": {},
        "by_priority": {},
        "overdue": [],
        "critical_pending": []
    }

    for task in all_tasks:
        # Contagem por status
        status = task["status"]
        summary["by_status"][status] = summary["by_status"].get(status, 0) + 1

        # Contagem por prioridade
        prio = task["priority"]
        summary["by_priority"][prio] = summary["by_priority"].get(prio, 0) + 1

        # Tarefas atrasadas
        if task.get("due_date") and task["due_date"] < today and task["status"] not in ("done", "cancelled"):
            summary["overdue"].append({"id": task["id"], "title": task["title"], "due": task["due_date"]})

        # Críticas pendentes
        if task["priority"] == "critical" and task["status"] not in ("done", "cancelled"):
            summary["critical_pending"].append({"id": task["id"], "title": task["title"]})

    return summary


# ── RESOURCES ─────────────────────────────────────────────────────────
@mcp.resource("tasks://all")
def get_all_tasks_resource() -> str:
    """Retorna todas as tarefas como JSON (exposto como Resource para o modelo)."""
    return json.dumps(list(tasks_db.values()), ensure_ascii=False, indent=2)


@mcp.resource("tasks://{task_id}")
def get_task_resource(task_id: str) -> str:
    """Retorna uma tarefa específica pelo ID."""
    tid = int(task_id)
    if tid not in tasks_db:
        return json.dumps({"error": "Tarefa não encontrada"})
    return json.dumps(tasks_db[tid], ensure_ascii=False, indent=2)


# ── PROMPTS ───────────────────────────────────────────────────────────
@mcp.prompt()
def daily_standup_prompt(team_member: str) -> str:
    """
    Gera um prompt de standup diário baseado nas tarefas atuais.

    Args:
        team_member: Nome do membro da equipe para o standup
    """
    summary = get_project_summary()
    return f"""
Você é um assistente de gestão ágil. Gere um standup diário estruturado para {team_member}.

Estado atual do projeto:
{json.dumps(summary, ensure_ascii=False, indent=2)}

O standup deve incluir:
1. O que foi feito ontem (tarefas concluídas recentemente)
2. O que será feito hoje (tarefas in_progress e próximas da lista)
3. Impedimentos (tarefas bloqueadas ou atrasadas)

Seja conciso (máximo 5 bullets por seção) e focado em valor entregue.
"""


# ── Ponto de entrada ───────────────────────────────────────────────────
if __name__ == "__main__":
    mcp.run(transport="stdio")
    # Para SSE: mcp.run(transport="sse", host="0.0.0.0", port=8080)
▶ Testando o servidor MCP localmente
  1. Instale as dependências: pip install "mcp[cli]"
  2. Teste via CLI do MCP: mcp dev task_manager_mcp.py — abre o MCP Inspector no browser
  3. No MCP Inspector, verifique que as 4 tools, 2 resources e 1 prompt aparecem listados
  4. Teste cada tool manualmente com o formulário interativo do inspector
  5. Para integrar ao Claude Desktop, adicione ao claude_desktop_config.json: {"command": "python", "args": ["/caminho/task_manager_mcp.py"]}
6

Deploy e Distribuição com Docker

Para tornar seu servidor MCP disponível para toda a equipe ou como produto, o deploy com Docker é a abordagem mais robusta. O servidor roda em modo SSE para aceitar conexões remotas.

DOCKERFILE
Dockerfile
# ── Build stage ───────────────────────────────────────────────────────
FROM python:3.12-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# ── Runtime stage ─────────────────────────────────────────────────────
FROM python:3.12-slim

# Cria usuário sem privilégios
RUN useradd -m -u 1001 mcpuser
WORKDIR /app

# Copia dependências instaladas
COPY --from=builder /root/.local /home/mcpuser/.local
COPY --chown=mcpuser:mcpuser task_manager_mcp.py .

USER mcpuser
ENV PATH=/home/mcpuser/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1

# Porta para SSE transport
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import httpx; httpx.get('http://localhost:8080/health')"

CMD ["python", "task_manager_mcp.py", "--transport", "sse", "--port", "8080"]
YAML
docker-compose.yml
version: "3.9"

services:
  # ── MCP Server: Task Manager ─────────────────────────────────────────
  mcp-task-manager:
    build:
      context: .
      dockerfile: Dockerfile
    image: mcp-task-manager:latest
    container_name: mcp-task-manager
    restart: unless-stopped
    ports:
      - "8080:8080"
    environment:
      - DATABASE_URL=postgresql://mcp:secret@postgres:5432/tasks
      - MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN}   # Token para autenticação OAuth
    depends_on:
      postgres:
        condition: service_healthy
    networks:
      - mcp-net
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.mcp-tasks.rule=Host(`mcp-tasks.empresa.com`)"
      - "traefik.http.routers.mcp-tasks.tls.certresolver=letsencrypt"

  # ── MCP Server: GitHub Integration ──────────────────────────────────
  mcp-github:
    image: mcp/github:latest
    container_name: mcp-github
    restart: unless-stopped
    ports:
      - "8081:8080"
    environment:
      - GITHUB_PERSONAL_ACCESS_TOKEN=${GITHUB_TOKEN}
      - TRANSPORT=sse
    networks:
      - mcp-net

  # ── Banco de dados para persistência ────────────────────────────────
  postgres:
    image: postgres:16-alpine
    container_name: mcp-postgres
    restart: unless-stopped
    environment:
      POSTGRES_DB: tasks
      POSTGRES_USER: mcp
      POSTGRES_PASSWORD: secret
    volumes:
      - postgres-data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mcp -d tasks"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - mcp-net

  # ── Nginx: Reverse proxy com TLS ────────────────────────────────────
  nginx:
    image: nginx:alpine
    container_name: mcp-nginx
    restart: unless-stopped
    ports:
      - "443:443"
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./certs:/etc/nginx/certs:ro
    depends_on:
      - mcp-task-manager
      - mcp-github
    networks:
      - mcp-net

volumes:
  postgres-data:

networks:
  mcp-net:
    driver: bridge
Boas práticas para servidores MCP em produção

1. Sempre use HTTPS/TLS para servidores remotos. 2. Implemente autenticação OAuth 2.1 conforme a spec MCP. 3. Use variáveis de ambiente para secrets (nunca hardcode). 4. Adicione rate limiting para evitar abuso. 5. Monitore com structured logging e métricas de latência por tool.

7

Google A2A — Agent-to-Agent Protocol

O Agent-to-Agent (A2A) é um protocolo aberto lançado pelo Google em abril de 2025 para padronizar como agentes de IA colaboram entre si. Enquanto o MCP conecta modelos a ferramentas, o A2A conecta agentes a agentes — permitindo que sistemas multi-agente heterogêneos operem em conjunto, independentemente do framework que cada agente usa internamente.

ℹ️
A2A complementa o MCP, não o substitui

O MCP resolve "como um agente acessa ferramentas e dados externos". O A2A resolve "como um agente delega trabalho para outro agente especializado". Um sistema robusto usa ambos: agentes individuais consomem ferramentas via MCP, e colaboram entre si via A2A.

🤝 Fluxo A2A: Cliente delega task para Remoto
🤖 CLIENT AGENT (ex: LangGraph, Claude app) 1. Descobre agent.json 2. Valida Signed Card 3. Submete Task 4. Consome Artifacts internamente: MCP tools 🤖 REMOTE AGENT (ex: CrewAI, framework Y) Publica agent.json Recebe Message/Parts Executa autônomo Emite Artifacts internamente: sub-agentes GET /.well-known/agent.json POST /tasks (submitted → working) stream de Messages + Artifacts (completed) Estados de Task: submitted → working → input-required? → completed | failed | canceled
Descoberta via agent.json + autenticação via Signed Card + Task com estados bem definidos. Comunicação é frame-agnóstica — cliente e remoto podem usar frameworks diferentes.

Casos de Uso do A2A

🏢
Enterprise Workflows

Agente de RH delega para agente financeiro quando precisa de dados salariais. Agente de vendas delega para agente jurídico análise de contratos.

🔬
Research Pipelines

Agente coordenador distribui análise de papers para agentes especializados em diferentes domínios (biomédica, estatística, revisão sistemática).

💻
Desenvolvimento de Software

Agente de product management delega para agente de arquitetura, que delega para agentes especializados em frontend, backend e testes.

🌐
Ecossistemas Abertos

Empresas diferentes publicam agentes especializados que qualquer sistema A2A-compatível pode descobrir e usar, como uma "API marketplace" para agentes.

8

Tasks, Messages, Parts, Agent Cards e Artifacts

O A2A define um vocabulário preciso de primitivos para comunicação entre agentes. Compreender esses conceitos é essencial para implementar e integrar agentes A2A.

Hierarquia dos Primitivos

📋
Task

A unidade fundamental de trabalho no A2A. Uma Task representa uma solicitação do cliente para o agente remoto. Tem um ID único, estado (lifecycle), e contém Messages de entrada e saída. É o container que gerencia o ciclo de vida completo de uma operação.

💬
Message

Uma unidade de comunicação dentro de uma Task. Messages têm role (user ou agent) e contêm uma lista de Parts. A conversa dentro de uma Task é representada como uma sequência de Messages — similar ao histórico de chat dos LLMs.

🧩
Part

O conteúdo atômico dentro de uma Message. Pode ser TextPart (texto simples), FilePart (arquivo com URI ou bytes base64) ou DataPart (JSON estruturado para dados arbitrários). Uma Message pode conter múltiplas Parts de tipos diferentes.

🃏
Agent Card

Metadados de descoberta publicados pelo agente no endpoint /.well-known/agent.json. Descreve as capacidades do agente, skills disponíveis, URL do endpoint, formatos suportados e requisitos de autenticação.

📦
Artifact

Saídas persistentes geradas pelo agente durante a execução de uma Task. Diferente das Messages (conversação), Artifacts são os resultados — documentos, arquivos, datasets, código gerado. São indexados e podem ser referenciados posteriormente.

9

Estados de uma Task e Fluxo de Comunicação

O ciclo de vida de uma Task A2A é bem definido com estados específicos que permitem ao cliente monitorar o progresso de forma padronizada.

Máquina de Estados

📤
submitted

A Task foi recebida pelo agente mas ainda não começou a ser processada. O agente fez o ACK da requisição.

⚙️
working

O agente está ativamente processando a Task. O cliente pode acompanhar o progresso via streaming de eventos (SSE).

⏸️
input-required

O agente precisa de informação adicional do cliente para continuar. Interação multi-turno — o cliente deve responder para retomar.

completed

A Task foi concluída com sucesso. O resultado final está disponível como Artifacts e a última Message do agente.

failed

A Task falhou por erro interno ou condição irrecuperável. O erro é descrito na última Message do agente.

🚫
cancelled

O cliente cancelou a Task via tasks/cancel. O agente para o processamento e limpa recursos.

Exemplo de Fluxo A2A Completo

PYTHON
a2a_client_example.py
"""
Cliente A2A completo demonstrando o fluxo de submissão de Tasks,
polling de status e leitura de Artifacts.

Instalação:
    pip install httpx python-a2a

Documentação do protocolo:
    https://google.github.io/A2A/
"""
import uuid
import time
import httpx
import json
from dataclasses import dataclass

# ── Modelos do protocolo A2A ───────────────────────────────────────────
@dataclass
class TextPart:
    text: str

    def to_dict(self) -> dict:
        return {"type": "text", "text": self.text}


@dataclass
class Message:
    role: str  # "user" ou "agent"
    parts: list

    def to_dict(self) -> dict:
        return {
            "role": self.role,
            "parts": [p.to_dict() if hasattr(p, "to_dict") else p for p in self.parts]
        }


class A2AClient:
    """
    Cliente A2A simplificado.
    Em produção, use o SDK oficial: pip install a2a-sdk
    """
    def __init__(self, agent_url: str, auth_token: str = None):
        self.base_url = agent_url.rstrip("/")
        self.headers = {"Content-Type": "application/json"}
        if auth_token:
            self.headers["Authorization"] = f"Bearer {auth_token}"

    def send_task(self, message: str, session_id: str = None) -> dict:
        """
        Envia uma nova Task para o agente.
        Retorna o objeto Task com ID e status inicial.
        """
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/send",
            "params": {
                "id": str(uuid.uuid4()),           # ID único da task
                "sessionId": session_id or str(uuid.uuid4()),
                "message": Message(
                    role="user",
                    parts=[TextPart(text=message)]
                ).to_dict()
            }
        }

        response = httpx.post(
            f"{self.base_url}/",
            json=payload,
            headers=self.headers,
            timeout=30.0
        )
        response.raise_for_status()
        result = response.json()

        if "error" in result:
            raise ValueError(f"Erro A2A: {result['error']}")

        return result["result"]

    def get_task(self, task_id: str) -> dict:
        """Consulta o estado atual de uma Task pelo ID."""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/get",
            "params": {"id": task_id}
        }
        response = httpx.post(
            f"{self.base_url}/",
            json=payload,
            headers=self.headers,
            timeout=15.0
        )
        response.raise_for_status()
        return response.json()["result"]

    def cancel_task(self, task_id: str) -> dict:
        """Cancela uma Task em andamento."""
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": "tasks/cancel",
            "params": {"id": task_id}
        }
        response = httpx.post(
            f"{self.base_url}/",
            json=payload,
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()["result"]

    def wait_for_completion(self, task_id: str, poll_interval: float = 1.0, timeout: float = 120.0) -> dict:
        """
        Aguarda a Task completar via polling.
        Em produção, prefira SSE para atualizações em tempo real.
        """
        terminal_states = {"completed", "failed", "cancelled"}
        start_time = time.time()

        while True:
            if time.time() - start_time > timeout:
                raise TimeoutError(f"Task {task_id} não completou em {timeout}s")

            task = self.get_task(task_id)
            state = task.get("status", {}).get("state")

            print(f"  Estado: {state}")

            if state in terminal_states:
                return task

            time.sleep(poll_interval)


# ── Exemplo de uso ─────────────────────────────────────────────────────
def main():
    # Conecta a um agente de análise de código rodando localmente
    client = A2AClient(
        agent_url="http://localhost:9000",
        auth_token="meu-token-seguro"
    )

    # 1. Submete uma Task de análise
    print("Submetendo task de análise de código...")
    task = client.send_task(
        message="""
        Analise o seguinte código Python e identifique:
        1. Problemas de performance
        2. Vulnerabilidades de segurança
        3. Violações de PEP 8

        ```python
        import os
        def query_db(user_input):
            conn = db.connect("sqlite:///app.db")
            result = conn.execute(f"SELECT * FROM users WHERE name='{user_input}'")
            return result.fetchall()
        ```
        """,
        session_id="session-analise-001"
    )

    task_id = task["id"]
    print(f"Task submetida: {task_id} — Estado inicial: {task['status']['state']}")

    # 2. Aguarda conclusão com polling
    print("\nAguardando conclusão...")
    completed_task = client.wait_for_completion(task_id, poll_interval=2.0, timeout=60.0)

    # 3. Lê os resultados
    print(f"\nTask concluída! Estado final: {completed_task['status']['state']}")

    # Exibe a resposta do agente
    if completed_task.get("artifacts"):
        print("\n── Artifacts gerados:")
        for artifact in completed_task["artifacts"]:
            print(f"  [{artifact['name']}] {artifact.get('description', '')}")

    # Exibe a última mensagem do agente
    history = completed_task.get("history", [])
    if history:
        last_message = history[-1]
        if last_message["role"] == "agent":
            for part in last_message["parts"]:
                if part["type"] == "text":
                    print(f"\n── Resposta do agente:\n{part['text']}")


if __name__ == "__main__":
    main()
10

Descoberta de Agentes via agent.json

Assim como o robots.txt padroniza informações para crawlers e o openapi.json documenta APIs REST, o A2A usa o arquivo agent.json no caminho /.well-known/agent.json para anunciar as capacidades de um agente ao ecossistema.

JSON
/.well-known/agent.json
{
  "name": "Code Review Agent",
  "description": "Agente especializado em revisão de código, detecção de bugs, análise de segurança e sugestões de refatoração para Python, JavaScript e Go.",
  "url": "https://agents.empresa.com/code-review",
  "version": "2.1.0",
  "provider": {
    "organization": "Empresa Tecnologia S.A.",
    "url": "https://empresa.com",
    "contact": "ai-team@empresa.com"
  },
  "documentationUrl": "https://docs.empresa.com/agents/code-review",
  "iconUrl": "https://empresa.com/assets/code-review-agent-icon.png",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true,
    "multiTurn": true
  },
  "authentication": {
    "schemes": ["Bearer"],
    "tokenUrl": "https://auth.empresa.com/oauth/token",
    "scopes": ["code-review:read", "code-review:write"]
  },
  "defaultInputModes": ["text/plain", "application/json"],
  "defaultOutputModes": ["text/plain", "application/json", "text/markdown"],
  "skills": [
    {
      "id": "security-analysis",
      "name": "Análise de Segurança",
      "description": "Identifica vulnerabilidades de segurança no código: injeção SQL, XSS, SSRF, hardcoded secrets, insecure dependencies.",
      "tags": ["security", "sast", "owasp"],
      "inputModes": ["text/plain"],
      "outputModes": ["application/json", "text/markdown"],
      "examples": [
        "Analise este código Python por vulnerabilidades OWASP Top 10",
        "Verifique se há hardcoded secrets neste repositório"
      ]
    },
    {
      "id": "performance-review",
      "name": "Revisão de Performance",
      "description": "Detecta ineficiências de performance: N+1 queries, complexidade O(n²), memory leaks, blocking I/O.",
      "tags": ["performance", "optimization"],
      "inputModes": ["text/plain"],
      "outputModes": ["text/markdown"],
      "examples": [
        "Esta função tem problemas de performance?",
        "Como otimizar este loop para processamento em batch?"
      ]
    },
    {
      "id": "refactoring-suggestions",
      "name": "Sugestões de Refatoração",
      "description": "Propõe melhorias de código aplicando princípios SOLID, design patterns e boas práticas da linguagem.",
      "tags": ["refactoring", "clean-code", "solid"],
      "inputModes": ["text/plain"],
      "outputModes": ["text/plain", "text/markdown"],
      "examples": [
        "Refatore esta classe seguindo Single Responsibility Principle",
        "Aplique o padrão Strategy neste switch-case"
      ]
    }
  ]
}

Como um Agente Orquestrador Descobre Outros Agentes

O processo de descoberta é simples: o orquestrador conhece as URLs base dos agentes disponíveis (via configuração, registro centralizado ou DNS) e consulta o /.well-known/agent.json de cada um para entender suas capacidades antes de delegar tarefas.

💡
Agent Registry Pattern

Em sistemas mais complexos, um Agent Registry centraliza a descoberta — similar ao Consul ou Kubernetes Service Discovery. Agentes registram-se no boot e o orquestrador consulta o registry para encontrar agentes com skills específicas, sem precisar conhecer URLs hardcoded.

11

Composição de Agentes de Diferentes Frameworks

Uma das principais proposta de valor do A2A é permitir que agentes construídos com frameworks diferentes — Google ADK, LangGraph, CrewAI, AutoGen — colaborem seamlessly. O protocolo é o contrato; o framework é o detalhe de implementação.

PYTHON
multi_framework_orchestrator.py
"""
Orquestrador multi-framework via A2A.

Demonstra como um agente coordenador (LangGraph) pode delegar tarefas para:
- Agente de análise de segurança (Google ADK)
- Agente de geração de documentação (CrewAI)
- Agente de review de código (AutoGen)

Todos se comunicam via protocolo A2A, independente do framework.
"""
import asyncio
import httpx
import json
import uuid
from dataclasses import dataclass, field
from typing import Optional

# ── Estruturas do protocolo A2A ────────────────────────────────────────
@dataclass
class AgentCapability:
    name: str
    url: str
    skills: list[str]
    description: str

# Registro de agentes disponíveis no ecossistema
AGENT_REGISTRY = [
    AgentCapability(
        name="Security Analyzer",
        url="https://agents.acme.com/security",
        skills=["security-analysis", "sast", "vulnerability-detection"],
        description="Agente ADK especializado em análise de segurança de código"
    ),
    AgentCapability(
        name="Docs Generator",
        url="https://agents.beta.com/docs",
        skills=["documentation", "api-docs", "readme-generation"],
        description="Agente CrewAI para geração de documentação técnica"
    ),
    AgentCapability(
        name="Code Reviewer",
        url="https://agents.gamma.io/review",
        skills=["code-review", "refactoring", "best-practices"],
        description="Agente AutoGen para revisão completa de código"
    ),
]


async def send_a2a_task(agent_url: str, message: str, token: str) -> dict:
    """
    Envia uma Task A2A e aguarda conclusão via polling assíncrono.
    """
    task_id = str(uuid.uuid4())

    payload = {
        "jsonrpc": "2.0",
        "id": str(uuid.uuid4()),
        "method": "tasks/send",
        "params": {
            "id": task_id,
            "sessionId": str(uuid.uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}]
            }
        }
    }

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }

    async with httpx.AsyncClient(timeout=120.0) as client:
        # Submete a task
        response = await client.post(agent_url, json=payload, headers=headers)
        response.raise_for_status()
        task = response.json()["result"]
        task_id = task["id"]

        # Polling até completion
        terminal_states = {"completed", "failed", "cancelled"}
        max_polls = 60

        for _ in range(max_polls):
            await asyncio.sleep(2)

            poll_payload = {
                "jsonrpc": "2.0",
                "id": str(uuid.uuid4()),
                "method": "tasks/get",
                "params": {"id": task_id}
            }

            poll_response = await client.post(agent_url, json=poll_payload, headers=headers)
            current_task = poll_response.json()["result"]
            state = current_task.get("status", {}).get("state")

            if state in terminal_states:
                return current_task

        return {"error": "Timeout aguardando resposta do agente", "task_id": task_id}


def find_agent_for_skill(skill: str) -> Optional[AgentCapability]:
    """Encontra o agente mais adequado para uma skill específica."""
    for agent in AGENT_REGISTRY:
        if skill in agent.skills:
            return agent
    return None


def extract_text_from_task(task: dict) -> str:
    """Extrai o texto da última mensagem do agente na task."""
    history = task.get("history", [])
    for message in reversed(history):
        if message.get("role") == "agent":
            for part in message.get("parts", []):
                if part.get("type") == "text":
                    return part["text"]
    return str(task.get("artifacts", [{}])[0].get("content", "Sem resposta"))


async def orchestrate_code_analysis(source_code: str, auth_tokens: dict) -> dict:
    """
    Orquestrador principal: distribui análise de código entre múltiplos agentes especializados.
    Executa análises em paralelo para eficiência máxima.
    """
    print("Iniciando orquestração multi-agente via A2A...\n")

    # Define as tasks para cada agente especializado
    tasks_config = [
        {
            "skill": "security-analysis",
            "message": f"Execute uma análise completa de segurança no seguinte código:\n\n```\n{source_code}\n```\nFoque em OWASP Top 10, CWE e boas práticas de segurança.",
        },
        {
            "skill": "documentation",
            "message": f"Gere documentação técnica completa para o seguinte código:\n\n```\n{source_code}\n```\nInclua docstrings, exemplos de uso e descrição de parâmetros.",
        },
        {
            "skill": "code-review",
            "message": f"Faça uma revisão completa do seguinte código:\n\n```\n{source_code}\n```\nAvalie qualidade, padrões de design, testabilidade e manutenibilidade.",
        },
    ]

    # Cria corrotinas para execução paralela
    coroutines = []
    agent_names = []

    for config in tasks_config:
        agent = find_agent_for_skill(config["skill"])
        if agent:
            token = auth_tokens.get(agent.name, "token-default")
            coroutines.append(send_a2a_task(agent.url, config["message"], token))
            agent_names.append(agent.name)
            print(f"  → Delegando '{config['skill']}' para {agent.name} ({agent.url})")
        else:
            print(f"  ⚠ Nenhum agente encontrado para skill: {config['skill']}")

    print(f"\nExecutando {len(coroutines)} análises em paralelo...\n")

    # Executa todas em paralelo via asyncio.gather
    results = await asyncio.gather(*coroutines, return_exceptions=True)

    # Consolida resultados
    consolidated = {
        "status": "completed",
        "source_code_hash": hash(source_code),
        "analyses": {}
    }

    for agent_name, result in zip(agent_names, results):
        if isinstance(result, Exception):
            consolidated["analyses"][agent_name] = {"error": str(result)}
        elif result.get("status", {}).get("state") == "completed":
            consolidated["analyses"][agent_name] = {
                "status": "completed",
                "result": extract_text_from_task(result),
                "artifacts": result.get("artifacts", [])
            }
        else:
            consolidated["analyses"][agent_name] = {
                "status": "failed",
                "error": result.get("error", "Falha desconhecida")
            }

    return consolidated


# ── Execução ───────────────────────────────────────────────────────────
async def main():
    sample_code = """
    def authenticate_user(username, password):
        query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
        result = db.execute(query)
        if result:
            session['user'] = username
            return True
        return False
    """

    auth_tokens = {
        "Security Analyzer": "token-security-abc123",
        "Docs Generator": "token-docs-xyz789",
        "Code Reviewer": "token-review-def456"
    }

    report = await orchestrate_code_analysis(sample_code, auth_tokens)

    print("\n" + "="*60)
    print("RELATÓRIO CONSOLIDADO DE ANÁLISE MULTI-AGENTE")
    print("="*60)
    print(json.dumps(report, ensure_ascii=False, indent=2))


if __name__ == "__main__":
    asyncio.run(main())
O poder da interoperabilidade A2A

O exemplo acima orquestra agentes construídos com Google ADK, CrewAI e AutoGen via uma interface única e padronizada. Nenhum dos agentes precisa saber que os outros existem — eles apenas expõem sua interface A2A. O orquestrador pode ser substituído por qualquer framework sem afetar os agentes especializados.