gRPC: 고급 기능
Updated:
개요
“gRPC 실습부터 운영까지” 책의 5장 내용을 실습하면서 인터셉터, 데드라인, 메타데이터에 대해 공부했다.
학습 내용
인터셉터(Interceptor)는 요청/응답 과정에서 공통 로직을 처리하는 미들웨어다. 서버 스트리밍 인터셉터를 구현해서 메시지 송수신 과정을 로깅했고, ServerStream을 래핑해서 메시지를 가로채는 방법을 익혔다.
데드라인(Deadline)은 요청이 완료되어야 하는 시간을 지정하는 기능이다. 타임아웃과 달리 클라이언트에서 서버로 전파되어 전체 요청 시간을 관리할 수 있다. Context를 통한 데드라인 설정과 마이크로서비스 체인에서의 활용법을 학습했다.
메타데이터(Metadata)는 gRPC 요청과 함께 전송되는 키-값 형태의 헤더 정보다. 인증 토큰이나 추적 ID 같은 부가 정보를 전달하는 방법과 Header/Trailer의 차이를 실습했다.
gRPC 인터셉터란?
gRPC 인터셉터는 두 가지 타입으로 나뉜다:
- Unary Interceptor: 단일 요청-응답 방식의 RPC 호출에 적용
- Stream Interceptor: 스트리밍 RPC 호출에 적용 (클라이언트 스트림, 서버 스트림, 양방향 스트림)
인터셉터를 사용하면 다음과 같은 작업을 수행할 수 있다:
- 요청/응답 로깅
- 인증 및 권한 검증
- 에러 핸들링
- 메트릭 수집
- 타임아웃 관리
서버 스트리밍 인터셉터 구현
1. StreamServerInterceptor 구조
func StreamServerInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
log.Printf("[Stream Interceptor] Stream started - Method: %s", info.FullMethod)
// 실제 스트림 핸들러 호출 전 로직
log.Printf("[Stream Interceptor] IsClientStream: %v", info.IsClientStream)
log.Printf("[Stream Interceptor] IsServerStream: %v", info.IsServerStream)
// ServerStream을 래핑하여 Send/Recv를 가로챌 수 있음
wrappedStream := &wrappedServerStream{
ServerStream: ss,
methodName: info.FullMethod,
}
// 실제 핸들러 호출
err := handler(srv, wrappedStream)
// 핸들러 호출 후 로직
if err != nil {
log.Printf("[Stream Interceptor] Stream ended with error: %v", err)
} else {
log.Printf("[Stream Interceptor] Stream ended successfully")
}
return err
}
핵심 포인트:
grpc.StreamServerInfo를 통해 메서드 정보, 스트림 타입 확인handler호출 전후로 로깅 처리ServerStream을 래핑하여 메시지 송수신을 가로챔
2. ServerStream 래핑
메시지 송수신을 가로채기 위해 grpc.ServerStream을 래핑하는 구조체를 정의했다:
type wrappedServerStream struct {
grpc.ServerStream
methodName string
}
// SendMsg - 메시지 전송 시 호출됨
func (w *wrappedServerStream) SendMsg(m interface{}) error {
log.Printf("[Stream Interceptor] Sending message: %+v", m)
return w.ServerStream.SendMsg(m)
}
// RecvMsg - 메시지 수신 시 호출됨
func (w *wrappedServerStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
log.Printf("[Stream Interceptor] Received message: %+v", m)
}
return err
}
SendMsg와 RecvMsg를 오버라이드하여 실제 메시지 내용을 로깅할 수 있다.
3. 서버 구현
기본적인 Greeter 서비스를 구현했다:
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("[Server] Received request from: %s", req.GetName())
return &pb.HelloReply{
Message: fmt.Sprintf("Hello, %s!", req.GetName()),
}, nil
}
func (s *server) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
name := req.GetName()
log.Printf("[Server] Received stream request from: %s", name)
// 3번 인사 메시지 전송
for i := 1; i <= 3; i++ {
if err := stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("Hello #%d, %s!", i, name),
}); err != nil {
return err
}
}
return nil
}
4. 인터셉터 등록
gRPC 서버 생성 시 인터셉터를 등록한다:
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("[Server] Failed to listen: %v", err)
}
// 스트리밍 인터셉터 등록
s := grpc.NewServer(
grpc.StreamInterceptor(StreamServerInterceptor),
)
pb.RegisterGreeterServer(s, &server{})
log.Printf("[Server] gRPC server with STREAM interceptor running at :50051")
log.Printf("[Server] Waiting for client requests...")
if err := s.Serve(lis); err != nil {
log.Fatalf("[Server] Failed to serve: %v", err)
}
}
인터셉터 체이닝
여러 인터셉터를 체인으로 연결할 수도 있다:
// Unary와 Stream 둘 다 등록
s := grpc.NewServer(
grpc.UnaryInterceptor(UnaryInterceptor),
grpc.StreamInterceptor(StreamServerInterceptor),
)
// 여러 개의 인터셉터 체인
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(unaryInterceptor1, unaryInterceptor2),
grpc.ChainStreamInterceptor(streamInterceptor1, streamInterceptor2),
)
실습 결과
서버를 실행하고 클라이언트에서 스트리밍 요청을 보내면 다음과 같은 로그가 출력된다:
[Server] gRPC server with STREAM interceptor running at :50051
[Server] Waiting for client requests...
[Stream Interceptor] Stream started - Method: /pb.Greeter/SayHelloStream
[Stream Interceptor] IsClientStream: false
[Stream Interceptor] IsServerStream: true
[Stream Interceptor] Received message: name:"Alice"
[Server] Received stream request from: Alice
[Stream Interceptor] Sending message: message:"Hello #1, Alice!"
[Stream Interceptor] Sending message: message:"Hello #2, Alice!"
[Stream Interceptor] Sending message: message:"Hello #3, Alice!"
[Stream Interceptor] Stream ended successfully
gRPC Deadline (데드라인)
데드라인이란?
Deadline은 gRPC 요청이 완료되어야 하는 최대 시간을 지정하는 메커니즘이다. 타임아웃과 달리 절대적 시간 개념으로, 클라이언트에서 설정한 데드라인이 서버로 전파되어 전체 요청 시간을 관리할 수 있다.
핵심 개념:
- 타임아웃 vs 데드라인: 타임아웃은 “대기 시간” (상대적), 데드라인은 “완료 기한” (절대적)
- 전파(Propagation): 클라이언트에서 설정한 데드라인이 서버로 전달됨
- 조기 종료: 데드라인 초과 시 즉시 에러 반환, 리소스 낭비 방지
왜 데드라인이 필요한가?
1. 리소스 낭비 방지
❌ 데드라인 없음:
클라이언트: 1초 후 포기
서버: 10초 동안 계속 작업 중... (낭비!)
✅ 데드라인 사용:
클라이언트: 1초 후 포기
서버: 1초 후 감지하고 작업 중단
2. 연쇄 실패 방지
마이크로서비스 체인에서 각 서비스가 시간 내 응답 못하면 즉시 실패하여 전체 시스템 블록을 방지한다.
Context를 통한 데드라인 설정
WithTimeout (상대 시간) - 권장
// 현재 시간부터 1초 후
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "User"})
WithDeadline (절대 시간)
// 특정 시각까지
deadline := time.Now().Add(1 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "User"})
서버에서 데드라인 확인하기
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 데드라인이 설정되어 있는지 확인
deadline, ok := ctx.Deadline()
if ok {
remaining := time.Until(deadline)
log.Printf("남은 시간: %v", remaining)
if remaining < 100*time.Millisecond {
return nil, status.Error(codes.DeadlineExceeded, "not enough time to process")
}
}
// Context 취소 감지
select {
case <-time.After(2 * time.Second):
// 정상 처리 완료
return &pb.HelloReply{Message: "Done"}, nil
case <-ctx.Done():
// 데드라인 초과 또는 클라이언트 취소
err := ctx.Err()
if err == context.DeadlineExceeded {
log.Println("⚠️ Deadline exceeded!")
return nil, status.Error(codes.DeadlineExceeded, "processing took too long")
}
if err == context.Canceled {
log.Println("🚫 Client cancelled request")
return nil, status.Error(codes.Canceled, "request cancelled")
}
return nil, err
}
}
에러 처리
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "User"})
if err != nil {
st, ok := status.FromError(err)
if !ok {
log.Fatal("Unknown error")
}
switch st.Code() {
case codes.DeadlineExceeded:
log.Println("⏰ Timeout! 서버가 제시간에 응답하지 못함")
// 재시도 로직 또는 fallback
case codes.Canceled:
log.Println("🚫 요청이 취소됨")
default:
log.Printf("Other error: %v", st.Code())
}
}
타임아웃 vs 데드라인: 핵심 차이
타임아웃 방식 (각각 독립적) ❌
Client (18:30:00)
├─ Timeout 5초 설정 → 18:30:05까지
│
├─► Service A (18:30:01 도착)
│ ├─ Timeout 5초 설정 → 18:30:06까지 (새로 설정!)
│ │
│ ├─► Service B (18:30:02 도착)
│ ├─ Timeout 5초 설정 → 18:30:07까지 (또 새로 설정!)
│
└─ 문제: 최악의 경우 15초 이상 소요 가능
데드라인 방식 (모두 공유) ✅
Client (18:30:00)
├─ Deadline 설정: 18:30:05
│
├─► Service A (18:30:01 도착)
│ ├─ Deadline: 18:30:05 (남은 시간: 4초)
│ │
│ ├─► Service B (18:30:02 도착)
│ ├─ Deadline: 18:30:05 (남은 시간: 3초)
│
└─ 장점: 전체 요청이 정확히 5초 안에 완료
중요: gRPC는 context.WithTimeout()을 사용해도 내부적으로 데드라인으로 변환하여 서버로 전파한다!
스트리밍에서의 데드라인
func (s *server) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
ctx := stream.Context()
for i := 1; i <= 10; i++ {
select {
case <-ctx.Done():
// 데드라인 초과 - 스트림 중단
log.Printf("Stream cancelled at message #%d", i)
return status.Error(codes.DeadlineExceeded, "stream timeout")
case <-time.After(1 * time.Second):
if err := stream.Send(&pb.HelloReply{
Message: fmt.Sprintf("Message #%d", i),
}); err != nil {
return err
}
}
}
return nil
}
모범 사례
1. 항상 데드라인 설정
// ❌ 나쁜 예: 데드라인 없음
resp, err := client.SayHello(context.Background(), req)
// ✅ 좋은 예: 적절한 데드라인
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.SayHello(ctx, req)
2. defer cancel() 항상 호출
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() // ✅ 리소스 누수 방지
3. 서버에서 Context 전파
❌ 잘못된 방법
// Client
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.CallServiceA(ctx, req)
// Service A (서버)
func (s *ServiceA) Handle(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// ❌ 잘못: 받은 context 무시하고 새로 만듦!
newCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Service B 호출
return s.serviceBClient.Call(newCtx, req)
}
✅ 올바른 방법
// Client
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.CallServiceA(ctx, req)
// Service A (서버)
func (s *ServiceA) Handle(ctx context.Context, req *pb.Request) (*pb.Response, error) {
// ✅ 올바름: 받은 context 그대로 사용!
deadline, _ := ctx.Deadline()
log.Printf("Deadline: %v, 남은 시간: %v", deadline, time.Until(deadline))
// Service B 호출 (같은 ctx 전달!)
resp, err := s.serviceBClient.Call(ctx, req)
if err != nil {
return nil, err
}
return &pb.Response{Data: resp.Data}, nil
}
4. 재시도 with 데드라인
func CallWithRetry(client pb.GreeterClient, name string) (*pb.HelloReply, error) {
maxRetries := 3
timeout := 1 * time.Second
for i := 0; i < maxRetries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: name})
cancel() // 즉시 호출
if err == nil {
return resp, nil
}
st := status.Convert(err)
if st.Code() != codes.DeadlineExceeded {
return nil, err
}
log.Printf("Retry %d/%d after timeout", i+1, maxRetries)
time.Sleep(100 * time.Millisecond)
}
return nil, status.Error(codes.DeadlineExceeded, "all retries failed")
}
gRPC 메타데이터 (Metadata)
메타데이터란?
메타데이터(Metadata)는 gRPC 요청/응답과 함께 전송되는 키-값 쌍의 헤더 정보다. gRPC는 HTTP/2 위에서 동작하므로, 메타데이터는 HTTP/2 헤더로 전송된다.
HTTP/2 헤더:
:method: POST
:path: /greeter.Greeter/SayHello
content-type: application/grpc
authorization: Bearer token123 ← 메타데이터
user-id: john.doe ← 메타데이터
trace-id: ABC-123 ← 메타데이터
메타데이터 사용 사례
메타데이터는 다양한 목적으로 활용된다:
- 인증(Authentication):
authorization: Bearer token - 분산 추적(Distributed Tracing):
trace-id,span-id - 사용자 정보:
user-id,user-role,user-tenant - 요청 메타 정보:
request-id,client-version,device-type - 기능 플래그:
feature-flags,ab-test-group
메타데이터 타입
1. Header (헤더)
- 전송 시점: RPC 시작 시
- 용도: 요청 정보, 인증, 추적 등
- 특징: RPC 시작 전에 전송됨
2. Trailer (트레일러)
- 전송 시점: RPC 종료 시
- 용도: 처리 결과, 통계, 상태 정보
- 특징: 응답 본문 이후 전송됨
클라이언트: 메타데이터 전송
방법 1: metadata.Pairs 사용 (권장)
// 메타데이터 생성
md := metadata.Pairs(
"user-id", "john",
"trace-id", "ABC-123",
"authorization", "Bearer token",
)
// Context에 추가
ctx := metadata.NewOutgoingContext(context.Background(), md)
// RPC 호출
resp, err := client.SayHello(ctx, &pb.HelloRequest{Name: "User"})
방법 2: 기존 Context에 추가
// 기존 Context가 있는 경우
ctx := context.Background()
// 메타데이터 추가
ctx = metadata.AppendToOutgoingContext(ctx,
"user-id", "john",
"trace-id", "ABC-123",
)
resp, err := client.SayHello(ctx, req)
서버: 메타데이터 수신
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 메타데이터 가져오기
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "no metadata")
}
// 모든 메타데이터 출력
for key, values := range md {
for _, value := range values {
log.Printf("%s: %s", key, value)
}
}
// 특정 키 값 가져오기
if tokens := md.Get("authorization"); len(tokens) > 0 {
token := tokens[0]
log.Printf("Token: %s", token)
// 토큰 검증...
}
// 사용자 ID 가져오기
if userIDs := md.Get("user-id"); len(userIDs) > 0 {
userID := userIDs[0]
log.Printf("User ID: %s", userID)
}
return &pb.HelloReply{Message: "OK"}, nil
}
서버: 메타데이터 전송
Header 메타데이터 전송
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 헤더 메타데이터 생성
header := metadata.Pairs(
"server-version", "1.0.0",
"request-id", "REQ-123",
)
// 헤더 전송 (응답 시작 시)
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, err
}
// 비즈니스 로직...
return &pb.HelloReply{Message: "OK"}, nil
}
Trailer 메타데이터 전송
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
// 비즈니스 로직...
// 트레일러 메타데이터 설정 (응답 종료 시)
trailer := metadata.Pairs(
"server-status", "ok",
"processing-time-ms", "42",
)
grpc.SetTrailer(ctx, trailer)
return &pb.HelloReply{Message: "OK"}, nil
}
클라이언트: 메타데이터 수신
var header, trailer metadata.MD
resp, err := client.SayHello(ctx, req,
grpc.Header(&header), // 헤더 수신
grpc.Trailer(&trailer), // 트레일러 수신
)
if err != nil {
log.Fatal(err)
}
// 헤더 읽기
if versions := header.Get("server-version"); len(versions) > 0 {
log.Printf("Server version: %s", versions[0])
}
// 트레일러 읽기
if statuses := trailer.Get("server-status"); len(statuses) > 0 {
log.Printf("Status: %s", statuses[0])
}
스트리밍에서의 메타데이터
// 서버
func (s *server) SayHelloStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloStreamServer) error {
// 수신 메타데이터 읽기
ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx)
// 헤더 전송
header := metadata.Pairs("stream-id", "STREAM-123")
if err := stream.SendHeader(header); err != nil {
return err
}
// 메시지 전송...
for i := 0; i < 5; i++ {
stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Message %d", i)})
}
// 트레일러 설정
stream.SetTrailer(metadata.Pairs("total-messages", "5"))
return nil
}
// 클라이언트
func callStream(client pb.GreeterClient) {
// 메타데이터 전송
md := metadata.Pairs("priority", "high")
ctx := metadata.NewOutgoingContext(context.Background(), md)
stream, err := client.SayHelloStream(ctx, &pb.HelloRequest{Name: "User"})
// 헤더 수신
header, err := stream.Header()
log.Println(header)
// 메시지 수신
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
log.Println(resp.Message)
}
// 트레일러 수신
trailer := stream.Trailer()
log.Println(trailer)
}
메타데이터 키 규칙
1. 소문자 사용
// ✅ 올바름
md := metadata.Pairs("user-id", "john")
// ❌ 잘못됨 (자동으로 소문자 변환됨)
md := metadata.Pairs("User-ID", "john")
2. ASCII 문자만 사용
// ✅ 올바름
md := metadata.Pairs("user-id", "john")
// ❌ 잘못됨
md := metadata.Pairs("사용자-id", "john")
3. 예약어 피하기
// ❌ gRPC 예약어: "grpc-"로 시작하는 키는 피할 것
// ❌ HTTP/2 예약어: ":method", ":path", ":authority" 등
4. 바이너리 데이터는 “-bin” 접미사
// 바이너리 값은 자동으로 base64 인코딩됨
md := metadata.Pairs("image-bin", string(imageBytes))
실전 패턴
1. 인증 인터셉터
func AuthInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 메타데이터에서 토큰 추출
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
token := tokens[0]
// 토큰 검증
if !validateToken(token) {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
// 검증 성공 - 사용자 정보를 Context에 추가
ctx = context.WithValue(ctx, "user-id", extractUserID(token))
return handler(ctx, req)
}
2. 추적 ID 전파
// 클라이언트
func callWithTracing(client pb.GreeterClient) {
traceID := generateTraceID()
ctx := metadata.AppendToOutgoingContext(context.Background(),
"trace-id", traceID,
)
resp, err := client.SayHello(ctx, req)
}
// 서버
func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloReply, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "no metadata")
}
traceIDs := md.Get("trace-id")
if len(traceIDs) == 0 {
return nil, status.Error(codes.InvalidArgument, "no trace-id")
}
traceID := traceIDs[0]
log.Printf("[%s] Processing request", traceID)
// 다른 서비스 호출 시 trace-id 전파
ctx = metadata.AppendToOutgoingContext(ctx, "trace-id", traceID)
resp, err := s.otherClient.Call(ctx, otherReq)
return resp, err
}
주의사항
1. 메타데이터 크기 제한
- HTTP/2 헤더 크기 제한 (일반적으로 8KB)
- 큰 데이터는 요청 본문에 포함
2. 대소문자 구분 안 됨
md := metadata.Pairs("User-ID", "john")
// 자동으로 "user-id"로 변환됨
3. 여러 값 지원
// 같은 키에 여러 값 추가 가능
md := metadata.Pairs(
"tag", "important",
"tag", "urgent",
"tag", "high-priority",
)
// 서버에서 읽기
tags := md.Get("tag") // []string{"important", "urgent", "high-priority"}
4. 민감 정보 주의
// ❌ 절대 하지 말 것
md := metadata.Pairs("password", "secret123")
// ✅ 토큰 사용
md := metadata.Pairs("authorization", "Bearer " + token)
정리
이번 실습을 통해 gRPC의 세 가지 고급 기능을 익혔다.
인터셉터는 로깅이나 인증 같은 공통 로직을 처리하는 데 유용했다. 특히 ServerStream을 래핑해서 메시지를 가로채는 방식이 인상적이었다.
데드라인은 타임아웃과 비슷해 보이지만 서버로 전파된다는 점이 중요했다. 마이크로서비스 체인에서 전체 요청 시간을 관리할 수 있어서 리소스 낭비를 막을 수 있다.
메타데이터는 HTTP/2 헤더로 전송되는 부가 정보다. 인증 토큰이나 추적 ID를 전달할 때 사용하면 되고, Header와 Trailer를 구분해서 쓸 수 있다는 걸 배웠다.
실제 프로젝트에 적용할 때는 인터셉터에 인증과 로깅을 넣고, 모든 요청에 데드라인을 설정하고, 메타데이터로 추적 정보를 전달하는 식으로 사용하면 될 것 같다.
댓글남기기