强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

HTTP/2 与 RPC 精讲教程 / 09 - gRPC 流式通信

第 09 章:gRPC 流式通信

当单次请求/响应不够用时,流式 RPC 就是答案


9.1 流式模式概述

gRPC 的流式模式(Streaming)允许客户端和服务器在一次 RPC 调用中发送或接收多个消息,充分利用 HTTP/2 的流式传输能力。

9.1.1 四种模式对比

模式 1:一元 RPC (Unary)
  客户端:  ──Request──→
  服务器:              ←──Response──

模式 2:服务端流 (Server Streaming)
  客户端:  ──Request──→
  服务器:              ←──Response 1──
                       ←──Response 2──
                       ←──Response 3──
                       ←──(END)──

模式 3:客户端流 (Client Streaming)
  客户端:  ──Request 1──→
           ──Request 2──→
           ──Request 3──→
           ──(END)─────→
  服务器:                  ←──Response──

模式 4:双向流 (Bidirectional Streaming)
  客户端:  ──Request 1──→
           ──Request 2──→    ←──Response 1──
                      ──→    ←──Response 2──
           ──Request 3──→
           ──(END)─────→    ←──Response 3──
                             ←──(END)──

9.1.2 选择指南

场景推荐模式理由
简单查询/更新一元 RPC最简单直接
大数据集返回服务端流流式返回避免内存爆炸
批量上传客户端流流式发送避免单次请求过大
实时双向通信双向流低延迟双向数据交换
聊天/协作文档双向流双方都需要持续发送
日志/事件推送服务端流服务器持续推送
文件上传客户端流分块上传

9.2 服务端流(Server Streaming)

9.2.1 Proto 定义

// streaming.proto
syntax = "proto3";

package example;

service NotificationService {
  // 服务端流:服务器持续推送通知
  rpc Subscribe(SubscribeRequest) returns (stream Notification);
  
  // 服务端流:流式返回列表数据
  rpc ListUsers(ListUsersRequest) returns (stream UserRecord);
}

message SubscribeRequest {
  string topic = 1;
  int64 since_timestamp = 2;
}

message Notification {
  string id = 1;
  string title = 2;
  string body = 3;
  int64 timestamp = 4;
}

message ListUsersRequest {
  string filter = 1;
  int32 page_size = 2;
}

message UserRecord {
  int64 id = 1;
  string name = 2;
  string email = 3;
}

9.2.2 服务端实现(Go)

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net"
	"time"

	pb "example/pb"
	"google.golang.org/grpc"
)

type notificationService struct {
	pb.UnimplementedNotificationServiceServer
}

// Subscribe 实现服务端流式 RPC
func (s *notificationService) Subscribe(
	req *pb.SubscribeRequest,
	stream pb.NotificationService_SubscribeServer,
) error {
	log.Printf("客户端订阅主题: %s", req.Topic)

	// 持续推送通知
	ticker := time.NewTicker(2 * time.Second)
	defer ticker.Stop()

	count := 0
	for {
		select {
		case <-stream.Context().Done():
			log.Println("客户端断开连接")
			return stream.Context().Err()
		case <-ticker.C:
			count++
			notification := &pb.Notification{
				Id:        fmt.Sprintf("notif-%d", count),
				Title:     fmt.Sprintf("通知 #%d", count),
				Body:      fmt.Sprintf("这是第 %d 条通知", count),
				Timestamp: time.Now().Unix(),
			}

			// 发送通知到客户端
			if err := stream.Send(notification); err != nil {
				log.Printf("发送失败: %v", err)
				return err
			}
			log.Printf("已发送通知 #%d", count)

			// 限制发送数量(演示用)
			if count >= 10 {
				return nil
			}
		}
	}
}

// ListUsers 流式返回用户列表
func (s *notificationService) ListUsers(
	req *pb.ListUsersRequest,
	stream pb.NotificationService_ListUsersServer,
) error {
	users := generateUsers(100) // 生成 100 个用户

	for _, user := range users {
		// 模拟逐条发送
		if err := stream.Send(user); err != nil {
			return err
		}
		time.Sleep(50 * time.Millisecond) // 模拟处理延迟
	}

	return nil
}

func generateUsers(count int) []*pb.UserRecord {
	users := make([]*pb.UserRecord, count)
	for i := 0; i < count; i++ {
		users[i] = &pb.UserRecord{
			Id:    int64(i + 1),
			Name:  fmt.Sprintf("User_%d", i+1),
			Email: fmt.Sprintf("user%[email protected]", i+1),
		}
	}
	return users
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("监听失败: %v", err)
	}

	server := grpc.NewServer()
	pb.RegisterNotificationServiceServer(server, &notificationService{})

	log.Println("gRPC 服务器启动于 :50051")
	if err := server.Serve(lis); err != nil {
		log.Fatalf("服务失败: %v", err)
	}
}

9.2.3 客户端实现(Go)

package main

import (
	"context"
	"io"
	"log"
	"time"

	pb "example/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	conn, err := grpc.Dial("localhost:50051",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("连接失败: %v", err)
	}
	defer conn.Close()

	client := pb.NewNotificationServiceClient(conn)

	// 示例 1:订阅通知流
	subscribeNotifications(client)

	// 示例 2:流式获取用户列表
	listUsers(client)
}

func subscribeNotifications(client pb.NotificationServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	stream, err := client.Subscribe(ctx, &pb.SubscribeRequest{
		Topic: "orders",
	})
	if err != nil {
		log.Fatalf("订阅失败: %v", err)
	}

	for {
		notification, err := stream.Recv()
		if err == io.EOF {
			log.Println("通知流结束")
			return
		}
		if err != nil {
			log.Fatalf("接收失败: %v", err)
		}
		log.Printf("[通知] %s: %s", notification.Title, notification.Body)
	}
}

func listUsers(client pb.NotificationServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
		Filter:   "active",
		PageSize: 100,
	})
	if err != nil {
		log.Fatalf("请求失败: %v", err)
	}

	var users []*pb.UserRecord
	for {
		user, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("接收失败: %v", err)
		}
		users = append(users, user)
	}

	log.Printf("共接收 %d 个用户", len(users))
}

9.3 客户端流(Client Streaming)

9.3.1 Proto 定义

// upload.proto
syntax = "proto3";

package example;

service FileService {
  // 客户端流:文件上传
  rpc Upload(stream UploadRequest) returns (UploadResponse);
  
  // 客户端流:批量导入
  rpc ImportUsers(stream ImportUserRequest) returns (ImportResponse);
}

message UploadRequest {
  string filename = 1;
  bytes chunk = 2;
  int32 chunk_number = 3;
}

message UploadResponse {
  string file_id = 1;
  int64 total_bytes = 2;
  string checksum = 3;
}

message ImportUserRequest {
  string name = 1;
  string email = 2;
  string department = 3;
}

message ImportResponse {
  int32 imported_count = 1;
  int32 failed_count = 2;
  repeated string errors = 3;
}

9.3.2 服务端实现(Go)

package main

import (
	"crypto/md5"
	"fmt"
	"io"
	"log"

	pb "example/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

type fileService struct {
	pb.UnimplementedFileServiceServer
}

// Upload 实现客户端流式文件上传
func (s *fileService) Upload(stream pb.FileService_UploadServer) error {
	var totalBytes int64
	var filename string
	var data []byte
	hasher := md5.New()

	for {
		req, err := stream.Recv()
		if err == io.EOF {
			// 客户端发送完毕,返回响应
			checksum := fmt.Sprintf("%x", hasher.Sum(nil))
			log.Printf("文件上传完成: %s, %d bytes, MD5: %s", 
				filename, totalBytes, checksum)

			return stream.SendAndClose(&pb.UploadResponse{
				FileId:     fmt.Sprintf("file-%d", time.Now().Unix()),
				TotalBytes: totalBytes,
				Checksum:   checksum,
			})
		}
		if err != nil {
			return status.Errorf(codes.Internal, "接收失败: %v", err)
		}

		// 首个 chunk 获取文件名
		if filename == "" {
			filename = req.Filename
			log.Printf("开始接收文件: %s", filename)
		}

		// 累积数据
		totalBytes += int64(len(req.Chunk))
		data = append(data, req.Chunk...)
		hasher.Write(req.Chunk)

		log.Printf("接收 chunk #%d, 累计 %d bytes", req.ChunkNumber, totalBytes)
	}
}

// ImportUsers 批量导入用户
func (s *fileService) ImportUsers(stream pb.FileService_ImportUsersServer) error {
	var imported, failed int32
	var errors []string

	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.ImportResponse{
				ImportedCount: imported,
				FailedCount:   failed,
				Errors:        errors,
			})
		}
		if err != nil {
			return err
		}

		// 处理每个用户
		if err := importUser(req); err != nil {
			failed++
			errors = append(errors, err.Error())
		} else {
			imported++
		}
	}
}

func importUser(req *pb.ImportUserRequest) error {
	if req.Name == "" {
		return fmt.Errorf("用户名称为空")
	}
	// 模拟导入逻辑
	return nil
}

func main() {
	lis, _ := net.Listen("tcp", ":50051")
	server := grpc.NewServer()
	pb.RegisterFileServiceServer(server, &fileService{})
	log.Println("服务器启动于 :50051")
	server.Serve(lis)
}

9.3.3 客户端实现(Go)

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	pb "example/pb"
	"google.golang.org/grpc"
)

func uploadFile(client pb.FileServiceClient, filePath string) error {
	stream, err := client.Upload(context.Background())
	if err != nil {
		return fmt.Errorf("创建流失败: %w", err)
	}

	file, err := os.Open(filePath)
	if err != nil {
		return fmt.Errorf("打开文件失败: %w", err)
	}
	defer file.Close()

	// 分块发送
	buf := make([]byte, 32*1024) // 32KB chunks
	chunkNum := 0

	for {
		n, err := file.Read(buf)
		if err == io.EOF {
			break
		}
		if err != nil {
			return fmt.Errorf("读取文件失败: %w", err)
		}

		chunkNum++
		sendErr := stream.Send(&pb.UploadRequest{
			Filename:    filePath,
			Chunk:       buf[:n],
			ChunkNumber: int32(chunkNum),
		})
		if sendErr != nil {
			return fmt.Errorf("发送 chunk #%d 失败: %w", chunkNum, sendErr)
		}

		log.Printf("发送 chunk #%d, %d bytes", chunkNum, n)
	}

	// 关闭发送并接收响应
	resp, err := stream.CloseAndRecv()
	if err != nil {
		return fmt.Errorf("接收响应失败: %w", err)
	}

	log.Printf("上传完成: file_id=%s, total=%d bytes, md5=%s",
		resp.FileId, resp.TotalBytes, resp.Checksum)
	return nil
}

func main() {
	conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
	defer conn.Close()

	client := pb.NewFileServiceClient(conn)
	if err := uploadFile(client, "/path/to/large-file.bin"); err != nil {
		log.Fatalf("上传失败: %v", err)
	}
}

9.4 双向流(Bidirectional Streaming)

9.4.1 Proto 定义

// chat.proto
syntax = "proto3";

package example;

service ChatService {
  // 双向流:实时聊天
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
  
  // 双向流:协同编辑
  rpc Collaborate(stream EditOperation) returns (stream EditOperation);
}

message ChatMessage {
  string user_id = 1;
  string room_id = 2;
  string content = 3;
  int64 timestamp = 4;
  MessageType type = 5;
  
  enum MessageType {
    TEXT = 0;
    IMAGE = 1;
    SYSTEM = 2;
    TYPING = 3;
  }
}

message EditOperation {
  string document_id = 1;
  string user_id = 2;
  int32 position = 3;
  string operation = 4;  // "insert", "delete", "replace"
  string content = 5;
  int64 version = 6;
}

9.4.2 服务端实现(Go)

package main

import (
	"io"
	"log"
	"sync"
	"time"

	pb "example/pb"
	"google.golang.org/grpc"
)

type chatService struct {
	pb.UnimplementedChatServiceServer
	mu      sync.RWMutex
	rooms   map[string][]chan *pb.ChatMessage
}

func newChatService() *chatService {
	return &chatService{
		rooms: make(map[string][]chan *pb.ChatMessage),
	}
}

func (s *chatService) Chat(stream pb.ChatService_ChatServer) error {
	// 用于广播消息的通道
	msgChan := make(chan *pb.ChatMessage, 100)
	var roomID string

	// 启动接收 goroutine
	go func() {
		for {
			msg, err := stream.Recv()
			if err == io.EOF {
				log.Println("客户端关闭发送")
				close(msgChan)
				return
			}
			if err != nil {
				log.Printf("接收错误: %v", err)
				close(msgChan)
				return
			}

			roomID = msg.RoomId
			msg.Timestamp = time.Now().UnixMilli()

			// 注册到房间
			s.joinRoom(roomID, msgChan)

			// 广播消息给房间内其他成员
			s.broadcast(roomID, msg, msgChan)
		}
	}()

	// 发送消息给当前客户端
	for msg := range msgChan {
		if err := stream.Send(msg); err != nil {
			log.Printf("发送错误: %v", err)
			s.leaveRoom(roomID, msgChan)
			return err
		}
	}

	return nil
}

func (s *chatService) joinRoom(roomID string, ch chan *pb.ChatMessage) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.rooms[roomID] = append(s.rooms[roomID], ch)
}

func (s *chatService) leaveRoom(roomID string, ch chan *pb.ChatMessage) {
	s.mu.Lock()
	defer s.mu.Unlock()
	members := s.rooms[roomID]
	for i, member := range members {
		if member == ch {
			s.rooms[roomID] = append(members[:i], members[i+1:]...)
			return
		}
	}
}

func (s *chatService) broadcast(roomID string, msg *pb.ChatMessage, sender chan *pb.ChatMessage) {
	s.mu.RLock()
	defer s.mu.RUnlock()

	for _, ch := range s.rooms[roomID] {
		if ch != sender { // 不发给自己
			select {
			case ch <- msg:
			default:
				log.Println("消息队列满,丢弃消息")
			}
		}
	}
}

func main() {
	lis, _ := net.Listen("tcp", ":50051")
	server := grpc.NewServer()
	pb.RegisterChatServiceServer(server, newChatService())
	log.Println("聊天服务器启动于 :50051")
	server.Serve(lis)
}

9.4.3 客户端实现(Go)

package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"os"
	"time"

	pb "example/pb"
	"google.golang.org/grpc"
)

func main() {
	conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
	defer conn.Close()

	client := pb.NewChatServiceClient(conn)
	stream, err := client.Chat(context.Background())
	if err != nil {
		log.Fatalf("创建聊天流失败: %v", err)
	}

	userID := fmt.Sprintf("user-%d", time.Now().UnixMilli())
	roomID := "general"

	// 启动接收 goroutine
	go func() {
		for {
			msg, err := stream.Recv()
			if err == io.EOF {
				log.Println("聊天结束")
				return
			}
			if err != nil {
				log.Printf("接收错误: %v", err)
				return
			}
			fmt.Printf("[%s] %s: %s\n",
				time.UnixMilli(msg.Timestamp).Format("15:04:05"),
				msg.UserId, msg.Content)
		}
	}()

	// 从标准输入读取消息并发送
	scanner := bufio.NewScanner(os.Stdin)
	fmt.Println("输入消息(输入 'quit' 退出):")

	for scanner.Scan() {
		text := scanner.Text()
		if text == "quit" {
			stream.CloseSend()
			break
		}

		err := stream.Send(&pb.ChatMessage{
			UserId:  userID,
			RoomId:  roomID,
			Content: text,
			Type:    pb.ChatMessage_TEXT,
		})
		if err != nil {
			log.Printf("发送失败: %v", err)
		}
	}
}

9.5 错误处理

9.5.1 gRPC 状态码

状态码名称说明
0OK成功
1CANCELLED操作被取消
2UNKNOWN未知错误
3INVALID_ARGUMENT参数无效
4DEADLINE_EXCEEDED超时
5NOT_FOUND资源不存在
6ALREADY_EXISTS资源已存在
7PERMISSION_DENIED权限不足
8RESOURCE_EXHAUSTED资源耗尽
9FAILED_PRECONDITION前置条件不满足
10ABORTED操作中止
11OUT_OF_RANGE超出范围
12UNIMPLEMENTED未实现
13INTERNAL内部错误
14UNAVAILABLE服务不可用
15DATA_LOSS数据丢失
16UNAUTHENTICATED未认证

9.5.2 流式 RPC 的错误处理

// 服务端:返回结构化错误
func (s *chatService) Chat(stream pb.ChatService_ChatServer) error {
	msg, err := stream.Recv()
	if err != nil {
		return err
	}

	// 参数校验
	if msg.RoomId == "" {
		return status.Error(codes.InvalidArgument, "room_id 不能为空")
	}

	// 权限检查
	if !isAuthorized(msg.UserId, msg.RoomId) {
		return status.Error(codes.PermissionDenied, 
			fmt.Sprintf("用户 %s 无权访问房间 %s", msg.UserId, msg.RoomId))
	}

	// 资源限制
	if isRoomFull(msg.RoomId) {
		st := status.New(codes.ResourceExhausted, "房间已满")
		// 添加详细信息
		ds, _ := st.WithDetails(&errdetails.QuotaFailure{
			Violations: []*errdetails.QuotaFailure_Violation{
				{Subject: msg.RoomId, Description: "最大成员数: 100"},
			},
		})
		return ds.Err()
	}

	// 正常处理...
	return nil
}

// 客户端:处理错误
func receiveMessages(stream pb.ChatService_ChatClient) {
	for {
		msg, err := stream.Recv()
		if err != nil {
			st, ok := status.FromError(err)
			if ok {
				switch st.Code() {
				case codes.Canceled:
					log.Println("聊天被取消")
				case codes.Unavailable:
					log.Println("服务不可用,尝试重连...")
					time.Sleep(5 * time.Second)
					// 重连逻辑...
				case codes.PermissionDenied:
					log.Printf("权限错误: %s", st.Message())
				default:
					log.Printf("错误 [%s]: %s", st.Code(), st.Message())
				}
			} else {
				log.Printf("非 gRPC 错误: %v", err)
			}
			return
		}
		// 处理消息...
		_ = msg
	}
}

9.5.3 流超时与取消

// 设置流超时
func listUsersWithTimeout(client pb.NotificationServiceClient) error {
	// 设置 10 秒超时
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
		Filter: "active",
	})
	if err != nil {
		return err
	}

	for {
		user, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			// 检查是否是超时
			if ctx.Err() == context.DeadlineExceeded {
				log.Println("列表获取超时")
				return ctx.Err()
			}
			return err
		}
		processUser(user)
	}
}

// 客户端主动取消
func cancelableStream(client pb.NotificationServiceClient) {
	ctx, cancel := context.WithCancel(context.Background())

	// 启动流
	stream, _ := client.Subscribe(ctx, &pb.SubscribeRequest{Topic: "events"})

	// 某些条件满足时取消
	go func() {
		time.Sleep(30 * time.Second)
		log.Println("超时,取消订阅")
		cancel()
	}()

	for {
		notif, err := stream.Recv()
		if err != nil {
			log.Printf("流结束: %v", err)
			return
		}
		processNotification(notif)
	}
}

9.6 业务场景:实时日志收集系统

架构:
┌──────────┐   客户端流    ┌──────────┐   服务端流    ┌──────────┐
│ 应用服务  │ ──────────→ │ 日志网关  │ ──────────→ │ 分析引擎  │
│ (多实例)  │  批量发送    │ (聚合)    │  流式转发    │ (处理)    │
└──────────┘             └──────────┘             └──────────┘

Proto 定义:
service LogCollector {
  // 应用 → 网关:客户端流式上传日志
  rpc UploadLogs(stream LogEntry) returns (UploadStatus);
  
  // 网关 → 分析引擎:服务端流式推送
  rpc StreamLogs(StreamRequest) returns (stream LogBatch);
}

9.7 注意事项

⚠️ 流的生命周期管理

  • 始终检查流的错误(Recv() 返回的 io.EOF 和其他错误)
  • 及时关闭流,避免资源泄漏
  • 处理上下文取消(stream.Context().Done()

⚠️ 并发安全

  • Send()Recv() 可以并发调用
  • 但同方向的多个 Send() 不能并发
  • 需要自行实现互斥锁

⚠️ 消息大小限制

  • gRPC 默认最大消息大小为 4MB
  • 流式传输时每个消息仍受此限制
  • 使用 grpc.MaxRecvMsgSize() 调整

💡 性能优化

  • 流式 RPC 比多次一元 RPC 更高效(复用连接、减少握手)
  • 合理设置缓冲区大小
  • 使用批量消息减少 RPC 调用次数

9.8 扩展阅读


第 08 章 - gRPC 基础 | 第 10 章 - gRPC 高级特性