Protocolos de Comunicação entre Agentes
Tool Calling, MCP (Model Context Protocol) e Google A2A (Agent-to-Agent) — os protocolos que permitem agentes de IA colaborar, consumir ferramentas externas e compor sistemas de múltiplos agentes em produção.
Tool Calling vs Protocolos Formais
Antes de entrar em protocolos como MCP e A2A, é fundamental compreender o mecanismo mais básico de extensão de capacidade dos LLMs: o Tool Calling (também chamado de Function Calling). É a fundação sobre a qual todos os protocolos mais sofisticados são construídos.
O que é Tool Calling
Tool calling é a capacidade nativa dos LLMs modernos (GPT-4, Claude, Gemini) de sinalizar, dentro da sua resposta, que desejam invocar uma função externa. O modelo não executa código — ele apenas descreve qual função chamar e com quais argumentos, em formato estruturado (normalmente JSON). A aplicação cliente é responsável por executar a função e devolver o resultado ao modelo para que ele continue o raciocínio.
O desenvolvedor define um JSON Schema descrevendo nome, parâmetros e tipos de cada função disponível. Esse schema é enviado ao modelo junto com o prompt.
O LLM raciocina sobre se precisa de informação externa e, se sim, qual tool usar. Ele retorna uma mensagem do tipo tool_call com os argumentos preenchidos.
A aplicação intercepta a resposta tool_call, executa a função localmente (ou via API), e devolve o resultado ao modelo como mensagem tool.
Com o resultado da função em contexto, o modelo continua sua resposta, podendo encadear múltiplas tool calls antes de gerar a resposta final para o usuário.
Exemplo Completo: Tool Calling com OpenAI
"""
Exemplo completo de Tool Calling com OpenAI Function Calling.
Inclui definição de schema, loop de execução e tratamento de resultados.
"""
import json
import httpx
from openai import OpenAI
client = OpenAI() # usa OPENAI_API_KEY do ambiente
# ── Definição das Tools (JSON Schema) ──────────────────────────────────
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Retorna a temperatura atual e condições climáticas de uma cidade.",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "Nome da cidade, ex: 'São Paulo, BR'"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Unidade de temperatura (padrão: celsius)"
}
},
"required": ["city"],
"additionalProperties": False
}
}
},
{
"type": "function",
"function": {
"name": "search_flights",
"description": "Busca voos disponíveis entre duas cidades em uma data.",
"parameters": {
"type": "object",
"properties": {
"origin": {"type": "string", "description": "Código IATA de origem, ex: GRU"},
"destination": {"type": "string", "description": "Código IATA de destino, ex: LIS"},
"date": {"type": "string", "description": "Data no formato YYYY-MM-DD"},
"passengers": {"type": "integer", "description": "Número de passageiros", "default": 1}
},
"required": ["origin", "destination", "date"]
}
}
}
]
# ── Implementação das funções reais ────────────────────────────────────
def get_weather(city: str, units: str = "celsius") -> dict:
"""
Em produção, chamaria uma API de clima real (OpenWeatherMap, etc.).
Aqui simulamos a resposta para fins didáticos.
"""
return {
"city": city,
"temperature": 24 if units == "celsius" else 75,
"units": units,
"condition": "parcialmente nublado",
"humidity": "68%",
"wind_speed": "15 km/h"
}
def search_flights(origin: str, destination: str, date: str, passengers: int = 1) -> dict:
"""Simula busca de voos."""
return {
"flights": [
{"airline": "LATAM", "flight": "LA8065", "departure": "08:30", "arrival": "14:45", "price": 1250.00},
{"airline": "TAP", "flight": "TP074", "departure": "22:00", "arrival": "11:30", "price": 980.00},
],
"origin": origin,
"destination": destination,
"date": date,
"passengers": passengers
}
# ── Dispatcher: executa a tool pelo nome ───────────────────────────────
TOOL_REGISTRY = {
"get_weather": get_weather,
"search_flights": search_flights,
}
def execute_tool(tool_call) -> str:
"""Executa uma tool call retornada pelo modelo e retorna o resultado como string JSON."""
name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
if name not in TOOL_REGISTRY:
return json.dumps({"error": f"Tool '{name}' não encontrada."})
result = TOOL_REGISTRY[name](**args)
return json.dumps(result, ensure_ascii=False, indent=2)
# ── Loop principal de conversação ──────────────────────────────────────
def chat_with_tools(user_message: str) -> str:
"""
Implementa o loop completo de tool calling:
1. Envia mensagem ao modelo com definição das tools
2. Verifica se modelo quer chamar alguma tool
3. Executa as tools e devolve resultados
4. Repete até o modelo retornar resposta final
"""
messages = [
{
"role": "system",
"content": "Você é um assistente de viagens prestativo. Use as ferramentas disponíveis quando necessário."
},
{"role": "user", "content": user_message}
]
MAX_ITERATIONS = 10 # evita loops infinitos
for iteration in range(MAX_ITERATIONS):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools,
tool_choice="auto" # modelo decide se usa tools
)
message = response.choices[0].message
# Se o modelo não quer usar tools, retorna a resposta
if message.tool_calls is None:
return message.content
# Adiciona a mensagem do assistente (com as tool_calls) ao histórico
messages.append(message)
# Executa cada tool call e adiciona os resultados
for tool_call in message.tool_calls:
print(f" → Chamando tool: {tool_call.function.name}({tool_call.function.arguments})")
result = execute_tool(tool_call)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
return "Limite de iterações atingido."
# ── Execução ───────────────────────────────────────────────────────────
if __name__ == "__main__":
query = "Qual o clima em Lisboa agora? Estou pensando em viajar de São Paulo no dia 15/06/2025."
print(f"Usuário: {query}\n")
answer = chat_with_tools(query)
print(f"Assistente: {answer}")
Tool Calling vs Protocolos Formais (MCP/A2A)
O tool calling resolve bem o caso de um único agente consumindo ferramentas definidas na mesma aplicação. Mas quando o sistema cresce — múltiplos agentes, ferramentas desenvolvidas por times diferentes, execução distribuída — surge a necessidade de protocolos formais.
| Aspecto | Tool Calling (nativo) | MCP (Model Context Protocol) | A2A (Agent-to-Agent) |
|---|---|---|---|
| Escopo | Único modelo ↔ funções locais | Qualquer cliente ↔ qualquer servidor de contexto | Agente orquestrador ↔ agentes especializados |
| Transporte | Interno à API do LLM | stdio, SSE ou HTTP Streaming | HTTP/JSON-RPC sobre HTTPS |
| Descoberta | Manual (developer define) | Listagem dinâmica de capabilities | agent.json publicado por cada agente |
| Reutilização | Baixa (código acoplado) | Alta (qualquer host MCP-compatível) | Alta (qualquer agente A2A-compatível) |
| Multi-agent | Não (requer orquestração manual) | Parcial (cliente único, múltiplos servidores) | Sim (agentes delegam tarefas entre si) |
| Estado de tarefa | Sem gerenciamento | Sem (stateless) | submitted → working → completed/failed |
Tool calling é como um funcionário que usa ferramentas que estão na sua mesa. MCP é como um escritório com acesso padronizado a recursos compartilhados da empresa. A2A é como contratar uma empresa terceirizada especializada para uma tarefa específica — você passa o briefing, eles entregam o resultado.
MCP — Model Context Protocol
O Model Context Protocol (MCP) é um padrão aberto criado pela Anthropic em novembro de 2024 para padronizar como aplicações de IA fornecem contexto a modelos de linguagem. Em vez de cada aplicação inventar sua própria integração com bancos de dados, APIs e sistemas de arquivos, o MCP define um protocolo universal — análogo ao que o LSP (Language Server Protocol) fez pelo ecossistema de editores de código.
Antes do MCP, cada ferramenta de IA (Claude Desktop, Cursor, Cline, etc.) precisava construir integrações customizadas para cada fonte de dados. Com MCP, basta construir um servidor MCP uma vez e ele funciona com qualquer host compatível. O ecossistema cresceu para centenas de servidores MCP públicos em poucos meses.
Os Três Primitivos do MCP
O MCP organiza as capacidades de um servidor em três categorias distintas:
Funções executáveis que o modelo pode chamar. São model-controlled — o LLM decide quando e como usá-las. Ex: execute_query(sql), send_email(to, subject, body), create_pr(title, branch).
Dados que o modelo pode ler como contexto. São application-controlled — a aplicação decide o que expor. Ex: conteúdo de arquivos, resultados de queries, páginas web, blobs de dados.
Templates de prompt pré-definidos e reutilizáveis expostos pelo servidor. São user-controlled — o usuário seleciona qual template usar. Ex: templates de revisão de código, análise de logs, geração de relatórios.
Arquitetura Cliente-Servidor MCP
O MCP segue uma arquitetura cliente-servidor rigorosa onde cada componente tem uma responsabilidade bem definida.
A aplicação que o usuário usa diretamente — Claude Desktop, Cursor, Cline, Continue, etc. O host gerencia múltiplas conexões com servidores MCP e injeta o contexto no LLM.
Componente interno do Host que mantém uma conexão 1:1 com um servidor MCP. Responsável pelo handshake de protocolo, listagem de capabilities e despacho de mensagens JSON-RPC 2.0.
Processo leve que expõe Tools, Resources e Prompts via protocolo MCP. Pode ser local (subprocess stdio) ou remoto (HTTP com SSE). Acessa os sistemas reais: bancos de dados, APIs, sistema de arquivos.
Os sistemas de dados reais: PostgreSQL, GitHub API, Slack, filesystem, S3, etc. O servidor MCP é o único que sabe como interagir com esses sistemas.
Transportes Suportados
O MCP define três mecanismos de transporte, cada um adequado para cenários diferentes:
- stdio: O cliente inicia o servidor como subprocesso e se comunica via stdin/stdout. Ideal para servidores locais (seguro, sem rede). É o mais comum para desenvolvimento.
- SSE (Server-Sent Events): O servidor fica escutando em uma porta HTTP e o cliente se conecta via SSE para receber eventos. Adequado para servidores remotos/cloud.
- Streamable HTTP: Novo transporte introduzido na spec 2025-03-26 que unifica requisição/resposta e streaming em um único endpoint HTTP, eliminando a necessidade de SSE separado.
Servidores MCP remotos requerem autenticação OAuth 2.1 conforme a spec. Nunca exponha um servidor MCP sem autenticação em produção. Para desenvolvimento local, o transporte stdio é suficientemente seguro pois não há exposição de rede.
Servidores MCP Existentes e Docker MCP Toolkit
O ecossistema de servidores MCP cresceu rapidamente. A Anthropic mantém o repositório oficial modelcontextprotocol/servers no GitHub com servidores de referência, e a comunidade desenvolveu centenas de integrações adicionais.
Servidores Oficiais (Reference Servers)
Leitura e escrita de arquivos locais com controle granular de permissões. Expõe tools como read_file, write_file, list_directory.
Interação completa com GitHub: criar issues, abrir PRs, buscar repos, gerenciar branches, fazer code reviews.
Executa queries SQL em bancos PostgreSQL. O schema do banco é exposto como Resource para que o modelo entenda a estrutura de dados.
Faz requisições HTTP e retorna o conteúdo de páginas web. Essencial para agentes que precisam pesquisar na internet.
Envia mensagens, lê canais, busca histórico de conversas no Slack. Permite agentes de automação corporativa.
Gerencia bancos SQLite locais: criar tabelas, inserir dados, executar queries analíticas. Ótimo para prototipagem.
Docker MCP Toolkit
A Docker lançou o Docker MCP Toolkit — uma coleção curada de servidores
MCP disponíveis como imagens Docker no Docker Hub (mcp/ namespace). Isso
elimina a necessidade de instalar dependências Python/Node localmente para cada servidor.
{
"mcpServers": {
"filesystem": {
"command": "docker",
"args": [
"run", "-i", "--rm",
"-v", "/Users/bruno/projects:/projects",
"mcp/filesystem",
"/projects"
]
},
"github": {
"command": "docker",
"args": [
"run", "-i", "--rm",
"-e", "GITHUB_PERSONAL_ACCESS_TOKEN",
"mcp/github"
],
"env": {
"GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_seu_token_aqui"
}
},
"postgres": {
"command": "docker",
"args": [
"run", "-i", "--rm",
"-e", "POSTGRES_CONNECTION_STRING",
"mcp/postgres"
],
"env": {
"POSTGRES_CONNECTION_STRING": "postgresql://user:pass@localhost:5432/mydb"
}
},
"brave-search": {
"command": "docker",
"args": [
"run", "-i", "--rm",
"-e", "BRAVE_API_KEY",
"mcp/brave-search"
],
"env": {
"BRAVE_API_KEY": "BSA_seu_api_key"
}
}
}
}
- Instale o Docker Desktop e certifique-se de que está rodando
- Abra o arquivo de configuração:
~/Library/Application Support/Claude/claude_desktop_config.json(macOS) ou%APPDATA%\Claude\claude_desktop_config.json(Windows) - Adicione os servidores MCP desejados no formato acima
- Reinicie o Claude Desktop — as imagens Docker serão baixadas automaticamente na primeira execução
- Verifique no menu de configurações do Claude Desktop que os servidores aparecem como conectados
Desenvolvimento de Servidores MCP com FastMCP
O FastMCP é a forma recomendada de desenvolver servidores MCP em Python. Ele fornece uma API declarativa com decoradores, similar ao FastAPI, eliminando o boilerplate do protocolo JSON-RPC e deixando você focado na lógica de negócio.
O SDK oficial mcp do Python é verboso e requer implementação manual do protocolo. O FastMCP (incluído como mcp.server.fastmcp a partir da versão 1.0) fornece a mesma experiência do FastAPI — decoradores, type hints, documentação automática. Use FastMCP para 99% dos casos.
Servidor MCP Completo: Gerenciador de Tarefas
"""
Servidor MCP completo para gerenciamento de tarefas.
Expõe Tools, Resources e Prompts conforme a spec MCP.
Instalação:
pip install "mcp[cli]" httpx
Execução (stdio, para desenvolvimento local):
python task_manager_mcp.py
Execução (SSE, para acesso remoto):
python task_manager_mcp.py --transport sse --port 8080
"""
import json
from datetime import datetime
from typing import Optional
from mcp.server.fastmcp import FastMCP
# ── Inicialização do servidor ──────────────────────────────────────────
mcp = FastMCP(
name="task-manager",
version="1.0.0",
description="Servidor MCP para gerenciamento de tarefas e projetos"
)
# Armazenamento em memória (em produção: use banco de dados)
tasks_db: dict[int, dict] = {}
task_id_counter = 0
# ── TOOLS ─────────────────────────────────────────────────────────────
@mcp.tool()
def create_task(
title: str,
description: str,
priority: str = "medium",
due_date: Optional[str] = None,
tags: list[str] = []
) -> dict:
"""
Cria uma nova tarefa no sistema.
Args:
title: Título curto e descritivo da tarefa
description: Descrição detalhada do que precisa ser feito
priority: Prioridade — 'low', 'medium', 'high', 'critical'
due_date: Data de vencimento no formato ISO 8601 (YYYY-MM-DD)
tags: Lista de tags para categorização
Returns:
Dicionário com os dados da tarefa criada, incluindo o ID gerado
"""
global task_id_counter
task_id_counter += 1
valid_priorities = {"low", "medium", "high", "critical"}
if priority not in valid_priorities:
return {"error": f"Prioridade inválida. Use: {valid_priorities}"}
task = {
"id": task_id_counter,
"title": title,
"description": description,
"priority": priority,
"status": "todo",
"due_date": due_date,
"tags": tags,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
tasks_db[task_id_counter] = task
return {"success": True, "task": task}
@mcp.tool()
def list_tasks(
status: Optional[str] = None,
priority: Optional[str] = None,
tag: Optional[str] = None
) -> dict:
"""
Lista tarefas com filtros opcionais.
Args:
status: Filtrar por status — 'todo', 'in_progress', 'done'
priority: Filtrar por prioridade — 'low', 'medium', 'high', 'critical'
tag: Filtrar por tag específica
Returns:
Lista de tarefas que correspondem aos filtros
"""
tasks = list(tasks_db.values())
if status:
tasks = [t for t in tasks if t["status"] == status]
if priority:
tasks = [t for t in tasks if t["priority"] == priority]
if tag:
tasks = [t for t in tasks if tag in t["tags"]]
# Ordena por prioridade (critical > high > medium > low)
priority_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
tasks.sort(key=lambda t: priority_order.get(t["priority"], 99))
return {
"total": len(tasks),
"tasks": tasks
}
@mcp.tool()
def update_task_status(task_id: int, new_status: str, note: Optional[str] = None) -> dict:
"""
Atualiza o status de uma tarefa existente.
Args:
task_id: ID numérico da tarefa a ser atualizada
new_status: Novo status — 'todo', 'in_progress', 'blocked', 'done', 'cancelled'
note: Nota opcional sobre a mudança de status
Returns:
Tarefa atualizada ou mensagem de erro
"""
if task_id not in tasks_db:
return {"error": f"Tarefa {task_id} não encontrada."}
valid_statuses = {"todo", "in_progress", "blocked", "done", "cancelled"}
if new_status not in valid_statuses:
return {"error": f"Status inválido. Use: {valid_statuses}"}
tasks_db[task_id]["status"] = new_status
tasks_db[task_id]["updated_at"] = datetime.utcnow().isoformat()
if note:
tasks_db[task_id]["last_note"] = note
return {"success": True, "task": tasks_db[task_id]}
@mcp.tool()
def get_project_summary() -> dict:
"""
Gera um resumo analítico do estado atual do projeto.
Returns:
Métricas agregadas: total por status, por prioridade, tarefas atrasadas
"""
all_tasks = list(tasks_db.values())
today = datetime.utcnow().date().isoformat()
summary = {
"total_tasks": len(all_tasks),
"by_status": {},
"by_priority": {},
"overdue": [],
"critical_pending": []
}
for task in all_tasks:
# Contagem por status
status = task["status"]
summary["by_status"][status] = summary["by_status"].get(status, 0) + 1
# Contagem por prioridade
prio = task["priority"]
summary["by_priority"][prio] = summary["by_priority"].get(prio, 0) + 1
# Tarefas atrasadas
if task.get("due_date") and task["due_date"] < today and task["status"] not in ("done", "cancelled"):
summary["overdue"].append({"id": task["id"], "title": task["title"], "due": task["due_date"]})
# Críticas pendentes
if task["priority"] == "critical" and task["status"] not in ("done", "cancelled"):
summary["critical_pending"].append({"id": task["id"], "title": task["title"]})
return summary
# ── RESOURCES ─────────────────────────────────────────────────────────
@mcp.resource("tasks://all")
def get_all_tasks_resource() -> str:
"""Retorna todas as tarefas como JSON (exposto como Resource para o modelo)."""
return json.dumps(list(tasks_db.values()), ensure_ascii=False, indent=2)
@mcp.resource("tasks://{task_id}")
def get_task_resource(task_id: str) -> str:
"""Retorna uma tarefa específica pelo ID."""
tid = int(task_id)
if tid not in tasks_db:
return json.dumps({"error": "Tarefa não encontrada"})
return json.dumps(tasks_db[tid], ensure_ascii=False, indent=2)
# ── PROMPTS ───────────────────────────────────────────────────────────
@mcp.prompt()
def daily_standup_prompt(team_member: str) -> str:
"""
Gera um prompt de standup diário baseado nas tarefas atuais.
Args:
team_member: Nome do membro da equipe para o standup
"""
summary = get_project_summary()
return f"""
Você é um assistente de gestão ágil. Gere um standup diário estruturado para {team_member}.
Estado atual do projeto:
{json.dumps(summary, ensure_ascii=False, indent=2)}
O standup deve incluir:
1. O que foi feito ontem (tarefas concluídas recentemente)
2. O que será feito hoje (tarefas in_progress e próximas da lista)
3. Impedimentos (tarefas bloqueadas ou atrasadas)
Seja conciso (máximo 5 bullets por seção) e focado em valor entregue.
"""
# ── Ponto de entrada ───────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")
# Para SSE: mcp.run(transport="sse", host="0.0.0.0", port=8080)
- Instale as dependências:
pip install "mcp[cli]" - Teste via CLI do MCP:
mcp dev task_manager_mcp.py— abre o MCP Inspector no browser - No MCP Inspector, verifique que as 4 tools, 2 resources e 1 prompt aparecem listados
- Teste cada tool manualmente com o formulário interativo do inspector
- Para integrar ao Claude Desktop, adicione ao
claude_desktop_config.json:{"command": "python", "args": ["/caminho/task_manager_mcp.py"]}
Deploy e Distribuição com Docker
Para tornar seu servidor MCP disponível para toda a equipe ou como produto, o deploy com Docker é a abordagem mais robusta. O servidor roda em modo SSE para aceitar conexões remotas.
# ── Build stage ───────────────────────────────────────────────────────
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# ── Runtime stage ─────────────────────────────────────────────────────
FROM python:3.12-slim
# Cria usuário sem privilégios
RUN useradd -m -u 1001 mcpuser
WORKDIR /app
# Copia dependências instaladas
COPY --from=builder /root/.local /home/mcpuser/.local
COPY --chown=mcpuser:mcpuser task_manager_mcp.py .
USER mcpuser
ENV PATH=/home/mcpuser/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
# Porta para SSE transport
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8080/health')"
CMD ["python", "task_manager_mcp.py", "--transport", "sse", "--port", "8080"]
version: "3.9"
services:
# ── MCP Server: Task Manager ─────────────────────────────────────────
mcp-task-manager:
build:
context: .
dockerfile: Dockerfile
image: mcp-task-manager:latest
container_name: mcp-task-manager
restart: unless-stopped
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgresql://mcp:secret@postgres:5432/tasks
- MCP_AUTH_TOKEN=${MCP_AUTH_TOKEN} # Token para autenticação OAuth
depends_on:
postgres:
condition: service_healthy
networks:
- mcp-net
labels:
- "traefik.enable=true"
- "traefik.http.routers.mcp-tasks.rule=Host(`mcp-tasks.empresa.com`)"
- "traefik.http.routers.mcp-tasks.tls.certresolver=letsencrypt"
# ── MCP Server: GitHub Integration ──────────────────────────────────
mcp-github:
image: mcp/github:latest
container_name: mcp-github
restart: unless-stopped
ports:
- "8081:8080"
environment:
- GITHUB_PERSONAL_ACCESS_TOKEN=${GITHUB_TOKEN}
- TRANSPORT=sse
networks:
- mcp-net
# ── Banco de dados para persistência ────────────────────────────────
postgres:
image: postgres:16-alpine
container_name: mcp-postgres
restart: unless-stopped
environment:
POSTGRES_DB: tasks
POSTGRES_USER: mcp
POSTGRES_PASSWORD: secret
volumes:
- postgres-data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mcp -d tasks"]
interval: 10s
timeout: 5s
retries: 5
networks:
- mcp-net
# ── Nginx: Reverse proxy com TLS ────────────────────────────────────
nginx:
image: nginx:alpine
container_name: mcp-nginx
restart: unless-stopped
ports:
- "443:443"
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./certs:/etc/nginx/certs:ro
depends_on:
- mcp-task-manager
- mcp-github
networks:
- mcp-net
volumes:
postgres-data:
networks:
mcp-net:
driver: bridge
1. Sempre use HTTPS/TLS para servidores remotos. 2. Implemente autenticação OAuth 2.1 conforme a spec MCP. 3. Use variáveis de ambiente para secrets (nunca hardcode). 4. Adicione rate limiting para evitar abuso. 5. Monitore com structured logging e métricas de latência por tool.
Google A2A — Agent-to-Agent Protocol
O Agent-to-Agent (A2A) é um protocolo aberto lançado pelo Google em abril de 2025 para padronizar como agentes de IA colaboram entre si. Enquanto o MCP conecta modelos a ferramentas, o A2A conecta agentes a agentes — permitindo que sistemas multi-agente heterogêneos operem em conjunto, independentemente do framework que cada agente usa internamente.
O MCP resolve "como um agente acessa ferramentas e dados externos". O A2A resolve "como um agente delega trabalho para outro agente especializado". Um sistema robusto usa ambos: agentes individuais consomem ferramentas via MCP, e colaboram entre si via A2A.
Casos de Uso do A2A
Agente de RH delega para agente financeiro quando precisa de dados salariais. Agente de vendas delega para agente jurídico análise de contratos.
Agente coordenador distribui análise de papers para agentes especializados em diferentes domínios (biomédica, estatística, revisão sistemática).
Agente de product management delega para agente de arquitetura, que delega para agentes especializados em frontend, backend e testes.
Empresas diferentes publicam agentes especializados que qualquer sistema A2A-compatível pode descobrir e usar, como uma "API marketplace" para agentes.
Tasks, Messages, Parts, Agent Cards e Artifacts
O A2A define um vocabulário preciso de primitivos para comunicação entre agentes. Compreender esses conceitos é essencial para implementar e integrar agentes A2A.
Hierarquia dos Primitivos
A unidade fundamental de trabalho no A2A. Uma Task representa uma solicitação do cliente para o agente remoto. Tem um ID único, estado (lifecycle), e contém Messages de entrada e saída. É o container que gerencia o ciclo de vida completo de uma operação.
Uma unidade de comunicação dentro de uma Task. Messages têm role (user ou agent) e contêm uma lista de Parts. A conversa dentro de uma Task é representada como uma sequência de Messages — similar ao histórico de chat dos LLMs.
O conteúdo atômico dentro de uma Message. Pode ser TextPart (texto simples), FilePart (arquivo com URI ou bytes base64) ou DataPart (JSON estruturado para dados arbitrários). Uma Message pode conter múltiplas Parts de tipos diferentes.
Metadados de descoberta publicados pelo agente no endpoint /.well-known/agent.json. Descreve as capacidades do agente, skills disponíveis, URL do endpoint, formatos suportados e requisitos de autenticação.
Saídas persistentes geradas pelo agente durante a execução de uma Task. Diferente das Messages (conversação), Artifacts são os resultados — documentos, arquivos, datasets, código gerado. São indexados e podem ser referenciados posteriormente.
Estados de uma Task e Fluxo de Comunicação
O ciclo de vida de uma Task A2A é bem definido com estados específicos que permitem ao cliente monitorar o progresso de forma padronizada.
Máquina de Estados
A Task foi recebida pelo agente mas ainda não começou a ser processada. O agente fez o ACK da requisição.
O agente está ativamente processando a Task. O cliente pode acompanhar o progresso via streaming de eventos (SSE).
O agente precisa de informação adicional do cliente para continuar. Interação multi-turno — o cliente deve responder para retomar.
A Task foi concluída com sucesso. O resultado final está disponível como Artifacts e a última Message do agente.
A Task falhou por erro interno ou condição irrecuperável. O erro é descrito na última Message do agente.
O cliente cancelou a Task via tasks/cancel. O agente para o processamento e limpa recursos.
Exemplo de Fluxo A2A Completo
"""
Cliente A2A completo demonstrando o fluxo de submissão de Tasks,
polling de status e leitura de Artifacts.
Instalação:
pip install httpx python-a2a
Documentação do protocolo:
https://google.github.io/A2A/
"""
import uuid
import time
import httpx
import json
from dataclasses import dataclass
# ── Modelos do protocolo A2A ───────────────────────────────────────────
@dataclass
class TextPart:
text: str
def to_dict(self) -> dict:
return {"type": "text", "text": self.text}
@dataclass
class Message:
role: str # "user" ou "agent"
parts: list
def to_dict(self) -> dict:
return {
"role": self.role,
"parts": [p.to_dict() if hasattr(p, "to_dict") else p for p in self.parts]
}
class A2AClient:
"""
Cliente A2A simplificado.
Em produção, use o SDK oficial: pip install a2a-sdk
"""
def __init__(self, agent_url: str, auth_token: str = None):
self.base_url = agent_url.rstrip("/")
self.headers = {"Content-Type": "application/json"}
if auth_token:
self.headers["Authorization"] = f"Bearer {auth_token}"
def send_task(self, message: str, session_id: str = None) -> dict:
"""
Envia uma nova Task para o agente.
Retorna o objeto Task com ID e status inicial.
"""
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/send",
"params": {
"id": str(uuid.uuid4()), # ID único da task
"sessionId": session_id or str(uuid.uuid4()),
"message": Message(
role="user",
parts=[TextPart(text=message)]
).to_dict()
}
}
response = httpx.post(
f"{self.base_url}/",
json=payload,
headers=self.headers,
timeout=30.0
)
response.raise_for_status()
result = response.json()
if "error" in result:
raise ValueError(f"Erro A2A: {result['error']}")
return result["result"]
def get_task(self, task_id: str) -> dict:
"""Consulta o estado atual de uma Task pelo ID."""
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/get",
"params": {"id": task_id}
}
response = httpx.post(
f"{self.base_url}/",
json=payload,
headers=self.headers,
timeout=15.0
)
response.raise_for_status()
return response.json()["result"]
def cancel_task(self, task_id: str) -> dict:
"""Cancela uma Task em andamento."""
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/cancel",
"params": {"id": task_id}
}
response = httpx.post(
f"{self.base_url}/",
json=payload,
headers=self.headers
)
response.raise_for_status()
return response.json()["result"]
def wait_for_completion(self, task_id: str, poll_interval: float = 1.0, timeout: float = 120.0) -> dict:
"""
Aguarda a Task completar via polling.
Em produção, prefira SSE para atualizações em tempo real.
"""
terminal_states = {"completed", "failed", "cancelled"}
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"Task {task_id} não completou em {timeout}s")
task = self.get_task(task_id)
state = task.get("status", {}).get("state")
print(f" Estado: {state}")
if state in terminal_states:
return task
time.sleep(poll_interval)
# ── Exemplo de uso ─────────────────────────────────────────────────────
def main():
# Conecta a um agente de análise de código rodando localmente
client = A2AClient(
agent_url="http://localhost:9000",
auth_token="meu-token-seguro"
)
# 1. Submete uma Task de análise
print("Submetendo task de análise de código...")
task = client.send_task(
message="""
Analise o seguinte código Python e identifique:
1. Problemas de performance
2. Vulnerabilidades de segurança
3. Violações de PEP 8
```python
import os
def query_db(user_input):
conn = db.connect("sqlite:///app.db")
result = conn.execute(f"SELECT * FROM users WHERE name='{user_input}'")
return result.fetchall()
```
""",
session_id="session-analise-001"
)
task_id = task["id"]
print(f"Task submetida: {task_id} — Estado inicial: {task['status']['state']}")
# 2. Aguarda conclusão com polling
print("\nAguardando conclusão...")
completed_task = client.wait_for_completion(task_id, poll_interval=2.0, timeout=60.0)
# 3. Lê os resultados
print(f"\nTask concluída! Estado final: {completed_task['status']['state']}")
# Exibe a resposta do agente
if completed_task.get("artifacts"):
print("\n── Artifacts gerados:")
for artifact in completed_task["artifacts"]:
print(f" [{artifact['name']}] {artifact.get('description', '')}")
# Exibe a última mensagem do agente
history = completed_task.get("history", [])
if history:
last_message = history[-1]
if last_message["role"] == "agent":
for part in last_message["parts"]:
if part["type"] == "text":
print(f"\n── Resposta do agente:\n{part['text']}")
if __name__ == "__main__":
main()
Descoberta de Agentes via agent.json
Assim como o robots.txt padroniza informações para crawlers e o
openapi.json documenta APIs REST, o A2A usa o arquivo
agent.json no caminho /.well-known/agent.json para
anunciar as capacidades de um agente ao ecossistema.
{
"name": "Code Review Agent",
"description": "Agente especializado em revisão de código, detecção de bugs, análise de segurança e sugestões de refatoração para Python, JavaScript e Go.",
"url": "https://agents.empresa.com/code-review",
"version": "2.1.0",
"provider": {
"organization": "Empresa Tecnologia S.A.",
"url": "https://empresa.com",
"contact": "ai-team@empresa.com"
},
"documentationUrl": "https://docs.empresa.com/agents/code-review",
"iconUrl": "https://empresa.com/assets/code-review-agent-icon.png",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": true,
"multiTurn": true
},
"authentication": {
"schemes": ["Bearer"],
"tokenUrl": "https://auth.empresa.com/oauth/token",
"scopes": ["code-review:read", "code-review:write"]
},
"defaultInputModes": ["text/plain", "application/json"],
"defaultOutputModes": ["text/plain", "application/json", "text/markdown"],
"skills": [
{
"id": "security-analysis",
"name": "Análise de Segurança",
"description": "Identifica vulnerabilidades de segurança no código: injeção SQL, XSS, SSRF, hardcoded secrets, insecure dependencies.",
"tags": ["security", "sast", "owasp"],
"inputModes": ["text/plain"],
"outputModes": ["application/json", "text/markdown"],
"examples": [
"Analise este código Python por vulnerabilidades OWASP Top 10",
"Verifique se há hardcoded secrets neste repositório"
]
},
{
"id": "performance-review",
"name": "Revisão de Performance",
"description": "Detecta ineficiências de performance: N+1 queries, complexidade O(n²), memory leaks, blocking I/O.",
"tags": ["performance", "optimization"],
"inputModes": ["text/plain"],
"outputModes": ["text/markdown"],
"examples": [
"Esta função tem problemas de performance?",
"Como otimizar este loop para processamento em batch?"
]
},
{
"id": "refactoring-suggestions",
"name": "Sugestões de Refatoração",
"description": "Propõe melhorias de código aplicando princípios SOLID, design patterns e boas práticas da linguagem.",
"tags": ["refactoring", "clean-code", "solid"],
"inputModes": ["text/plain"],
"outputModes": ["text/plain", "text/markdown"],
"examples": [
"Refatore esta classe seguindo Single Responsibility Principle",
"Aplique o padrão Strategy neste switch-case"
]
}
]
}
Como um Agente Orquestrador Descobre Outros Agentes
O processo de descoberta é simples: o orquestrador conhece as URLs base dos agentes
disponíveis (via configuração, registro centralizado ou DNS) e consulta o
/.well-known/agent.json de cada um para entender suas capacidades antes
de delegar tarefas.
Em sistemas mais complexos, um Agent Registry centraliza a descoberta — similar ao Consul ou Kubernetes Service Discovery. Agentes registram-se no boot e o orquestrador consulta o registry para encontrar agentes com skills específicas, sem precisar conhecer URLs hardcoded.
Composição de Agentes de Diferentes Frameworks
Uma das principais proposta de valor do A2A é permitir que agentes construídos com frameworks diferentes — Google ADK, LangGraph, CrewAI, AutoGen — colaborem seamlessly. O protocolo é o contrato; o framework é o detalhe de implementação.
"""
Orquestrador multi-framework via A2A.
Demonstra como um agente coordenador (LangGraph) pode delegar tarefas para:
- Agente de análise de segurança (Google ADK)
- Agente de geração de documentação (CrewAI)
- Agente de review de código (AutoGen)
Todos se comunicam via protocolo A2A, independente do framework.
"""
import asyncio
import httpx
import json
import uuid
from dataclasses import dataclass, field
from typing import Optional
# ── Estruturas do protocolo A2A ────────────────────────────────────────
@dataclass
class AgentCapability:
name: str
url: str
skills: list[str]
description: str
# Registro de agentes disponíveis no ecossistema
AGENT_REGISTRY = [
AgentCapability(
name="Security Analyzer",
url="https://agents.acme.com/security",
skills=["security-analysis", "sast", "vulnerability-detection"],
description="Agente ADK especializado em análise de segurança de código"
),
AgentCapability(
name="Docs Generator",
url="https://agents.beta.com/docs",
skills=["documentation", "api-docs", "readme-generation"],
description="Agente CrewAI para geração de documentação técnica"
),
AgentCapability(
name="Code Reviewer",
url="https://agents.gamma.io/review",
skills=["code-review", "refactoring", "best-practices"],
description="Agente AutoGen para revisão completa de código"
),
]
async def send_a2a_task(agent_url: str, message: str, token: str) -> dict:
"""
Envia uma Task A2A e aguarda conclusão via polling assíncrono.
"""
task_id = str(uuid.uuid4())
payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/send",
"params": {
"id": task_id,
"sessionId": str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}]
}
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
async with httpx.AsyncClient(timeout=120.0) as client:
# Submete a task
response = await client.post(agent_url, json=payload, headers=headers)
response.raise_for_status()
task = response.json()["result"]
task_id = task["id"]
# Polling até completion
terminal_states = {"completed", "failed", "cancelled"}
max_polls = 60
for _ in range(max_polls):
await asyncio.sleep(2)
poll_payload = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "tasks/get",
"params": {"id": task_id}
}
poll_response = await client.post(agent_url, json=poll_payload, headers=headers)
current_task = poll_response.json()["result"]
state = current_task.get("status", {}).get("state")
if state in terminal_states:
return current_task
return {"error": "Timeout aguardando resposta do agente", "task_id": task_id}
def find_agent_for_skill(skill: str) -> Optional[AgentCapability]:
"""Encontra o agente mais adequado para uma skill específica."""
for agent in AGENT_REGISTRY:
if skill in agent.skills:
return agent
return None
def extract_text_from_task(task: dict) -> str:
"""Extrai o texto da última mensagem do agente na task."""
history = task.get("history", [])
for message in reversed(history):
if message.get("role") == "agent":
for part in message.get("parts", []):
if part.get("type") == "text":
return part["text"]
return str(task.get("artifacts", [{}])[0].get("content", "Sem resposta"))
async def orchestrate_code_analysis(source_code: str, auth_tokens: dict) -> dict:
"""
Orquestrador principal: distribui análise de código entre múltiplos agentes especializados.
Executa análises em paralelo para eficiência máxima.
"""
print("Iniciando orquestração multi-agente via A2A...\n")
# Define as tasks para cada agente especializado
tasks_config = [
{
"skill": "security-analysis",
"message": f"Execute uma análise completa de segurança no seguinte código:\n\n```\n{source_code}\n```\nFoque em OWASP Top 10, CWE e boas práticas de segurança.",
},
{
"skill": "documentation",
"message": f"Gere documentação técnica completa para o seguinte código:\n\n```\n{source_code}\n```\nInclua docstrings, exemplos de uso e descrição de parâmetros.",
},
{
"skill": "code-review",
"message": f"Faça uma revisão completa do seguinte código:\n\n```\n{source_code}\n```\nAvalie qualidade, padrões de design, testabilidade e manutenibilidade.",
},
]
# Cria corrotinas para execução paralela
coroutines = []
agent_names = []
for config in tasks_config:
agent = find_agent_for_skill(config["skill"])
if agent:
token = auth_tokens.get(agent.name, "token-default")
coroutines.append(send_a2a_task(agent.url, config["message"], token))
agent_names.append(agent.name)
print(f" → Delegando '{config['skill']}' para {agent.name} ({agent.url})")
else:
print(f" ⚠ Nenhum agente encontrado para skill: {config['skill']}")
print(f"\nExecutando {len(coroutines)} análises em paralelo...\n")
# Executa todas em paralelo via asyncio.gather
results = await asyncio.gather(*coroutines, return_exceptions=True)
# Consolida resultados
consolidated = {
"status": "completed",
"source_code_hash": hash(source_code),
"analyses": {}
}
for agent_name, result in zip(agent_names, results):
if isinstance(result, Exception):
consolidated["analyses"][agent_name] = {"error": str(result)}
elif result.get("status", {}).get("state") == "completed":
consolidated["analyses"][agent_name] = {
"status": "completed",
"result": extract_text_from_task(result),
"artifacts": result.get("artifacts", [])
}
else:
consolidated["analyses"][agent_name] = {
"status": "failed",
"error": result.get("error", "Falha desconhecida")
}
return consolidated
# ── Execução ───────────────────────────────────────────────────────────
async def main():
sample_code = """
def authenticate_user(username, password):
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
result = db.execute(query)
if result:
session['user'] = username
return True
return False
"""
auth_tokens = {
"Security Analyzer": "token-security-abc123",
"Docs Generator": "token-docs-xyz789",
"Code Reviewer": "token-review-def456"
}
report = await orchestrate_code_analysis(sample_code, auth_tokens)
print("\n" + "="*60)
print("RELATÓRIO CONSOLIDADO DE ANÁLISE MULTI-AGENTE")
print("="*60)
print(json.dumps(report, ensure_ascii=False, indent=2))
if __name__ == "__main__":
asyncio.run(main())
O exemplo acima orquestra agentes construídos com Google ADK, CrewAI e AutoGen via uma interface única e padronizada. Nenhum dos agentes precisa saber que os outros existem — eles apenas expõem sua interface A2A. O orquestrador pode ser substituído por qualquer framework sem afetar os agentes especializados.