A2A Protocol 서버/클라이언트 만들기(1)

Updated:

A2A 프로토콜을 사용하여 통신하는 클라이언트/서버를 만드는 것을 목표로 한다.

Python SDK를 사용하여 개발하고 Code Run을 통해 배포하는 예제는 아래에서 학습할 수 있다.
https://codelabs.developers.google.com/intro-a2a-purchasing-concierge?hl=ko#1

나는 Go를 사용해 클라이언트/서버를 만들었고 OpenAI를 통해 자연어 처리를 하도록 구성하였다. 배송 요금/라벨 발급을 하는 서비스이며, 고객이 박스 사이즈/무게/발송지/도착지를 보내면, 컨시어지가 여러 택배사 에이전트에게 견적/ETA/라벨 발급 가능 여부를 질의해서 최적의 선택을 반환한다.
이번 포스팅에서 택배사 에이전트는 AI 에이전트가 아니며 A2A Protocol을 통한 통신을 주 목적으로 한다
세부 코드는 아래에서 확인할 수 있다.

https://github.com/KIMMUSIC/a2a-protocol


[프로젝트 구조]

pkg/a2a/                 # A2A 공통 계약(타입/서명/미들웨어)
services/
  concierge-go/          # 오케스트레이터(팬아웃/팬인)
  agent-a-go/            # 캐리어 A (동기 QUOTE/SHIP)
  agent-b-go/            # 캐리어 B (QUOTE, 비동기 SHIP)
  interpreter-go/        # LLM 기반 자연어  QUOTE.input 구조화

[A2A Contract Type]

공통 A2A 타입을 정의하는 부분이다.

package a2a

import "encoding/json"

const ContractVersion = "1.0"

// ---- Task lifecycle ---------------------------------------------------------

type TaskStatus string

const (
	StatusPending   TaskStatus = "PENDING"
	StatusRunning   TaskStatus = "RUNNING"
	StatusSucceeded TaskStatus = "SUCCEEDED"
	StatusFailed    TaskStatus = "FAILED"
)

// CreateTask: 다른 에이전트에게 작업을 위임할 때 사용하는 표준 입력
type CreateTask struct {
	TaskType       string          `json:"task_type"`                 // e.g. QUOTE | SHIP | RANK | INTERPRET
	Input          json.RawMessage `json:"input"`                     // 도메인별 입력(JSON blob)
	ReplyURL       string          `json:"reply_url,omitempty"`       // 콜백 받을 URL(옵션)
	IdempotencyKey string          `json:"idempotency_key,omitempty"` // 멱등 처리용
	Meta           map[string]any  `json:"meta,omitempty"`            // 추가 컨텍스트(옵션)
}

// Task: 작업의 현재 상태/결과/오류를 나타내는 표준 출력
type Task struct {
	TaskID string          `json:"task_id"`
	Status TaskStatus      `json:"status"`
	Result json.RawMessage `json:"result,omitempty"` // 도메인별 결과(JSON blob)
	Error  *ErrorPayload   `json:"error,omitempty"`
}

// ---- Agent discovery ---------------------------------------------------------

type AgentMeta struct {
	AgentID      string            `json:"agent_id"`
	Name         string            `json:"name"`
	Version      string            `json:"version"`          // 구현체 버전
	ContractVer  string            `json:"contract_version"` // A2A 계약 버전 (1.0)
	Capabilities []AgentCapability `json:"capabilities"`
	Auth         *AuthSpec         `json:"auth,omitempty"`
}

type AgentCapability struct {
	TaskType     string `json:"task_type"`     // e.g. QUOTE
	InputSchema  string `json:"input_schema"`  // 문서/스키마 식별자(설명 또는 URL 가능)
	OutputSchema string `json:"output_schema"` // 문서/스키마 식별자
}

type AuthSpec struct {
	Required bool   `json:"required"`
	Scheme   string `json:"scheme"` // "HMAC" | "mTLS" | "None"
}

// ---- Task events (async callbacks) ------------------------------------------

type Event struct {
	Event   string          `json:"event"` // e.g. TASK_COMPLETED | TASK_FAILED | TASK_PROGRESS
	TaskID  string          `json:"task_id"`
	Payload json.RawMessage `json:"payload,omitempty"` // 결과/중간상태
}

[concierge-go]

클라이언트에 해당하는 컨시어지 main 함수의 코드를 살펴보자

func discover(base string) {
	resp, err := http.Get(base + "/.well-known/agent.json")
	if err != nil {
		log.Println("discovery error:", base, err)
		return
	}
	defer resp.Body.Close()
	var meta a2a.AgentMeta
	json.NewDecoder(resp.Body).Decode(&meta)
	log.Printf("discovered: %s capabilities=%v\n", meta.AgentID, meta.Capabilities)
}

agent로 요청을 보내 에이전트 카드를 얻어오는 부분이다.

각 에이전트는 아래와 같은 에이전트 카드를 응답한다.

[interpreter]

	// Discovery
	r.Get("/.well-known/agent.json", func(w http.ResponseWriter, _ *http.Request) {
		meta := a2a.AgentMeta{
			AgentID:     agentID,
			Name:        "Interpreter (Go LLM)",
			Version:     "0.2.0",
			ContractVer: a2a.ContractVersion,
			Capabilities: []a2a.AgentCapability{
				{TaskType: "INTERPRET", InputSchema: "Utterance", OutputSchema: "QuoteRequest"},
			},
			Auth: &a2a.AuthSpec{Required: false, Scheme: "HMAC"},
		}
		_ = json.NewEncoder(w).Encode(meta)
	})

[agent-a/agent-b]

    	r.Get("/.well-known/agent.json", func(w http.ResponseWriter, _ *http.Request) {
		meta := a2a.AgentMeta{
			AgentID: agentID, Name: "Agent-A (Go)", Version: "0.1.0",
			ContractVer: a2a.ContractVersion,
			Capabilities: []a2a.AgentCapability{
				{TaskType: "QUOTE", InputSchema: "QuoteRequest", OutputSchema: "QuoteResult"},
				{TaskType: "SHIP", InputSchema: "ShipRequest", OutputSchema: "ShipResult"},
			},
			Auth: &a2a.AuthSpec{Required: false, Scheme: "HMAC"},
		}
		json.NewEncoder(w).Encode(meta)
	})

agent-a는 동기로 응답하며 agent-b는 비동기 + 콜백으로 응답한다.

[agent-a]

r.Post("/tasks", func(w http.ResponseWriter, r *http.Request) {
		var ct a2a.CreateTask
		if err := json.NewDecoder(r.Body).Decode(&ct); err != nil {
			w.WriteHeader(400)
			json.NewEncoder(w).Encode(a2a.Task{Error: a2a.NewError(a2a.ErrValidationFailed, err.Error())})
			return
		}
		if err := a2a.ValidateCreateTask(&ct); err != nil {
			w.WriteHeader(400)
			json.NewEncoder(w).Encode(a2a.Task{Error: a2a.NewError(a2a.ErrValidationFailed, err.Error())})
			return
		}
		taskID := "t_" + RandID()
		t := &a2a.Task{TaskID: taskID, Status: a2a.StatusSucceeded} // QUOTE/SHIP 동기 처리(Agent-A는 동기)
		switch ct.TaskType {
		case "QUOTE":
			result := map[string]any{"carrier": "AgentA", "service": "EXPRESS", "price": 7000 + 1500*2, "eta_days": 2}
			b, _ := json.Marshal(result)
			t.Result = b
		case "SHIP":
			result := map[string]any{"status": "READY", "tracking_id": "A-" + RandID(), "label_url": "https://cdn.local/A.png"}
			b, _ := json.Marshal(result)
			t.Result = b
		default:
			t.Status = a2a.StatusFailed
			t.Error = a2a.NewError(a2a.ErrValidationFailed, "unsupported task_type")
		}
		st.mu.Lock()
		st.m[taskID] = t
		st.mu.Unlock()
		json.NewEncoder(w).Encode(map[string]any{"task_id": taskID, "status": t.Status})
	})

[agent-b]

// services/agent-b-go/main.go (핵심만)
tid := "t_" + RandID()
switch ct.TaskType {
case "SHIP":
  saveTask(tid, a2a.StatusPending, nil)   // 1) 먼저 PENDING
  go func(tid string, input json.RawMessage, reply string) {
    time.Sleep(3*time.Second)             // 처리 지연 시뮬
    res := map[string]any{"status":"READY","tracking_id":"B-"+RandID(),"label_url":"https://cdn/B.png"}
    saveTask(tid, a2a.StatusSucceeded, mustJSON(res))
    // 2) A2A 이벤트 콜백 (Concierge의 /tasks/{id}/events)
    http.Post(replyOrDefault(reply, conciergeBase)+"/tasks/"+tid+"/events",
      "application/json",
      bytes.NewReader(mustJSON(map[string]any{"event":"TASK_COMPLETED","task_id":tid,"payload":res})))
  }(tid, ct.Input, ct.ReplyURL)
  _ = json.NewEncoder(w).Encode(map[string]any{"task_id":tid,"status":a2a.StatusPending})
}

agent-b는 PENDING 영수증만 먼저 반환, 백그라운드에서 처리 후 Concierge 이벤트 엔드포인트로 TASK_COMPLETED 전송 → Concierge가 자기 Task 상태를 SUCCEEDED로 바꾼다.

[concierge]

클라이언트에서 자연어해석 -> 팬아웃 -> 팬인 집계를 처리하는 부분의 코드이다.

// services/concierge-go/main.go (핵심)
var agentA = env("AGENT_A_URL","http://localhost:8081")
var agentB = env("AGENT_B_URL","http://localhost:8082")

// 1) 자연어면 Interpreter 먼저
quoteInput := ct.Input
if needsInterpret(ct.Input) {
  structured, err := postInterpret(interpreterURL, ct.Input)
  if err != nil { /* 400 처리 */ }
  quoteInput = dropMeta(structured) // _meta 제거(있다면)
}

// 2) fan-out
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()

type qres struct{ ok bool; data map[string]any; err error }
ch := make(chan qres, 2)

go func(){ q,err:=postTask(agentA,"QUOTE",quoteInput); ch<-qres{err==nil,q,err} }()
go func(){ q,err:=postTask(agentB,"QUOTE",quoteInput); ch<-qres{err==nil,q,err} }()

// 3) fan-in
var quotes []map[string]any
var partial []any
for i:=0;i<2;i++{
  select {
    case r := <-ch:
      if r.ok { quotes=append(quotes,r.data) } else { partial=append(partial,r.err.Error()) }
    case <-ctx.Done():
      partial=append(partial,"timeout"); i=2
  }
}

// 4) 집계 결과 저장
saveTask(taskID, a2a.StatusSucceeded, mustJSON(map[string]any{
  "quotes": quotes, "partial_failures": partial,
}))

[요청과 응답]

Mac ~ % curl -s localhost:8083/tasks -H 'Content-Type: application/json' \
  -d '{"task_type":"INTERPRET","input":{"utterance":"서울에서 일본으로 5kg 빠르게"}}' | python3 -m json.tool
{
    "status": "SUCCEEDED",
    "task_id": "t_interp_015421"
}

Mac ~ % curl -s localhost:8083/tasks/t_interp_015421                      
  
{"task_id":"t_interp_015421","status":"SUCCEEDED","result":{"from":{"country":"KR"},"to":{"country":"JP"},"parcel":{"h_cm":15,"l_cm":30,"w_cm":20,"weight_kg":5},"options":{"priority":true},"currency":"KRW","max_wait_ms":1200}}

카테고리:

업데이트:

댓글남기기