HTTP/2 与 RPC 精讲教程 / 13 - Connect 协议
第 13 章:Connect 协议
兼容 gRPC、拥抱 Web、curl 友好的新一代 RPC 协议
13.1 Connect 概述
Connect 是 Buf(Protocol Buffers 工具链公司)于 2022 年推出的 RPC 协议。它在保持与 gRPC 完全兼容的基础上,解决了 gRPC 在 Web 环境中的诸多痛点。
13.1.1 为什么需要 Connect
gRPC 的痛点:
1. 浏览器不原生支持
- 需要 gRPC-Web 代理(Envoy/进程内代理)
- 增加架构复杂度
2. curl 无法调用
- 二进制协议,无法用 curl 测试
- 需要专门的 grpcurl 工具
3. HTTP/1.1 不支持
- 必须 HTTP/2
- 某些代理/CDN 不完全支持 HTTP/2
4. 错误细节有限
- 只有状态码 + 消息
- 需要额外的 Details 机制
Connect 的解决方案:
✓ 原生支持浏览器(无需代理)
✓ curl 可以直接调用
✓ 支持 HTTP/1.1 和 HTTP/2
✓ 与 gRPC 完全互操作
13.1.2 Connect 协议模式
Connect 支持三种协议模式:
| 模式 | 传输 | 编码 | 适用场景 |
|---|---|---|---|
| Connect 协议 | HTTP/1.1 或 HTTP/2 | JSON 或 Protobuf | Web / 通用 |
| gRPC 协议 | HTTP/2 | Protobuf | 与 gRPC 完全互操作 |
| gRPC-Web 协议 | HTTP/1.1 或 HTTP/2 | Protobuf | gRPC-Web 兼容 |
13.2 Connect 协议详解
13.2.1 请求格式(Unary)
Connect 协议的 Unary 请求:
POST /example.UserService/GetUser HTTP/1.1
Content-Type: application/json ← JSON 编码(curl 友好!)
Connect-Protocol-Version: 1
{"id": 1}
--- 或者使用 Protobuf ---
POST /example.UserService/GetUser HTTP/1.1
Content-Type: application/proto
Connect-Protocol-Version: 1
<二进制 Protobuf 数据>
--- 响应格式 ---
HTTP/1.1 200 OK
Content-Type: application/json
{"user": {"id": 1, "name": "Alice"}}
--- 错误响应 ---
HTTP/1.1 404 Not Found
Content-Type: application/json
{"code": "not_found", "message": "用户 1 不存在"}
13.2.2 流式请求格式
Connect 协议的 Stream 请求:
POST /example.UserService/WatchUsers HTTP/1.1
Content-Type: application/connect+json ← 注意前缀
Connect-Protocol-Version: 1
Trailer: Connect-Timeout-Ms
<流式消息序列:每个消息带 4 字节前缀长度>
消息格式(流式):
┌──────────────────┐
│ Flags (1 byte) │ 0x00 = 数据, 0x02 = 结束
│ Length (3 bytes) │ 消息长度(大端序)
│ Data (变长) │ 消息内容
└──────────────────┘
13.3 与 gRPC 的对比
| 特性 | gRPC | Connect |
|---|---|---|
| 浏览器原生支持 | ❌ 需要代理 | ✅ 原生支持 |
| curl 调用 | ❌ 二进制协议 | ✅ JSON 可选 |
| HTTP/1.1 支持 | ❌ | ✅ |
| JSON 编码 | ❌ | ✅ |
| 流式 RPC | ✅ | ✅ |
| Protobuf 支持 | ✅ | ✅ |
| 拦截器 | ✅ | ✅ |
| 错误详情 | 状态码 + Details | 状态码 + 结构化 JSON |
| 代码生成 | protoc + 插件 | protoc + 插件(同一 .proto) |
| 互操作 | - | ✅ 与 gRPC 服务器互操作 |
| Go 实现 | grpc-go | connect-go |
| TypeScript 实现 | @grpc/grpc-js | @connectrpc/connect |
13.3.1 性能对比
测试环境:Go 1.21, 4核 CPU, 10,000 请求
┌──────────────────┬──────────┬──────────┬──────────┐
│ 指标 │ gRPC │ Connect │ 差异 │
├──────────────────┼──────────┼──────────┼──────────┤
│ Unary P50 │ 0.3ms │ 0.32ms │ +7% │
│ Unary P99 │ 0.8ms │ 0.85ms │ +6% │
│ 流式吞吐 │ 85k QPS │ 82k QPS │ -3.5% │
│ Protobuf 大小 │ 320B │ 320B │ 相同 │
│ JSON 大小 │ N/A │ 1024B │ 3.2x │
│ 内存占用 │ 基准 │ +5% │ 略高 │
└──────────────────┴──────────┴──────────┴──────────┘
结论:Protobuf 模式下性能几乎相同,差异可忽略
13.4 Connect 实战
13.4.1 安装与项目设置
# 安装 buf 工具
# https://buf.build/docs/installation
go install github.com/bufbuild/buf/cmd/buf@latest
# 安装 connect-go
go get connectrpc.com/connect
# 安装 protoc-gen-connect-go
go install connectrpc.com/connect/cmd/protoc-gen-connect-go@latest
# buf.yaml
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT
# buf.gen.yaml
version: v1
plugins:
- plugin: buf.build/protocolbuffers/go
out: gen
opt: paths=source_relative
- plugin: buf.build/connectrpc/go
out: gen
opt: paths=source_relative
13.4.2 服务定义(Proto)
// user/v1/user.proto
syntax = "proto3";
package user.v1;
message User {
int64 id = 1;
string name = 2;
string email = 3;
}
message GetUserRequest {
int64 id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
}
// user/v1/user_service.proto
syntax = "proto3";
package user.v1;
import "user/v1/user.proto";
service UserService {
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc ListUsers(ListUsersRequest) returns (stream User);
}
# 生成代码
buf generate
13.4.3 服务端实现(Go)
package main
import (
"context"
"fmt"
"log"
"net/http"
userv1 "example/gen/user/v1"
"example/gen/user/v1/userv1connect"
"connectrpc.com/connect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
type userServer struct {
userv1connect.UnimplementedUserServiceHandler
users map[int64]*userv1.User
}
func newUserServer() *userServer {
return &userServer{
users: map[int64]*userv1.User{
1: {Id: 1, Name: "Alice", Email: "[email protected]"},
2: {Id: 2, Name: "Bob", Email: "[email protected]"},
},
}
}
func (s *userServer) GetUser(
ctx context.Context,
req *connect.Request[userv1.GetUserRequest],
) (*connect.Response[userv1.GetUserResponse], error) {
user, ok := s.users[req.Msg.Id]
if !ok {
return nil, connect.NewError(connect.CodeNotFound,
fmt.Errorf("用户 %d 不存在", req.Msg.Id))
}
resp := connect.NewResponse(&userv1.GetUserResponse{User: user})
resp.Header().Set("X-Request-Id", "req-123")
return resp, nil
}
func (s *userServer) ListUsers(
ctx context.Context,
req *connect.Request[userv1.ListUsersRequest],
stream *connect.ServerStream[userv1.User],
) error {
for _, user := range s.users {
if err := stream.Send(user); err != nil {
return err
}
}
return nil
}
func main() {
server := newUserServer()
mux := http.NewServeMux()
// 注册 Connect 服务
path, handler := userv1connect.NewUserServiceHandler(server)
mux.Handle(path, handler)
// 同时支持 HTTP/2 和 HTTP/1.1(h2c 用于开发,生产环境用 TLS)
addr := ":8080"
log.Printf("Connect 服务器启动于 %s", addr)
log.Printf(" Connect 协议: POST http://%s/user.v1.UserService/GetUser", addr)
log.Printf(" gRPC 协议: POST http://%s/user.v1.UserService/GetUser (gRPC headers)", addr)
http.ListenAndServe(addr, h2c.NewHandler(mux, &http2.Server{}))
}
13.4.4 客户端实现(Go)
package main
import (
"context"
"fmt"
"log"
"net/http"
userv1 "example/gen/user/v1"
"example/gen/user/v1/userv1connect"
"connectrpc.com/connect"
)
func main() {
// 创建 Connect 客户端
client := userv1connect.NewUserServiceClient(
http.DefaultClient,
"http://localhost:8080",
)
// Unary 调用
resp, err := client.GetUser(
context.Background(),
connect.NewRequest(&userv1.GetUserRequest{Id: 1}),
)
if err != nil {
log.Printf("调用失败: %v", err)
if connectErr, ok := err.(*connect.Error); ok {
log.Printf("错误码: %s, 消息: %s", connectErr.Code(), connectErr.Message())
}
return
}
fmt.Printf("用户: %s (%s)\n", resp.Msg.User.Name, resp.Msg.User.Email)
// 流式调用
stream, err := client.ListUsers(
context.Background(),
connect.NewRequest(&userv1.ListUsersRequest{PageSize: 100}),
)
if err != nil {
log.Fatalf("流式调用失败: %v", err)
}
for stream.Receive() {
user := stream.Msg()
fmt.Printf(" 流式接收: %s\n", user.Name)
}
if err := stream.Err(); err != nil {
log.Fatalf("流错误: %v", err)
}
}
13.4.5 浏览器/TypeScript 客户端
// 使用 @connectrpc/connect-web
import { createClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";
import { UserService } from "./gen/user/v1/user_service_pb";
// 创建传输层(Connect 协议,支持 HTTP/1.1)
const transport = createConnectTransport({
baseUrl: "http://localhost:8080",
});
// 创建客户端
const client = createClient(UserService, transport);
// 调用
async function getUser(id: bigint) {
try {
const response = await client.getUser({ id });
console.log(`用户: ${response.user?.name}`);
} catch (err) {
console.error("调用失败:", err);
}
}
// 流式调用(服务端流)
async function listUsers() {
for await (const user of client.listUsers({ pageSize: 100 })) {
console.log(`用户: ${user.name}`);
}
}
13.4.6 curl 直接调用
# Connect 协议 + JSON 编码(curl 友好!)
curl \
--header "Content-Type: application/json" \
--data '{"id": 1}' \
http://localhost:8080/user.v1.UserService/GetUser
# 输出:
# {"user": {"id": "1", "name": "Alice", "email": "[email protected]"}}
# 对比 gRPC(需要 grpcurl)
# grpcurl -plaintext -d '{"id": 1}' localhost:50051 user.v1.UserService/GetUser
13.5 错误处理
13.5.1 Connect 错误格式
{
"code": "not_found",
"message": "用户 123 不存在",
"details": [
{
"type": "google.rpc.ErrorInfo",
"value": "eyJkb21haW4iOiJ1c2VyIn0=",
"debug": {
"reason": "USER_NOT_FOUND",
"domain": "user-service",
"metadata": {"user_id": "123"}
}
}
]
}
13.5.2 错误码对照
| Connect Code | HTTP 状态码 | gRPC Code |
|---|---|---|
canceled | 408 | CANCELLED |
unknown | 500 | UNKNOWN |
invalid_argument | 400 | INVALID_ARGUMENT |
deadline_exceeded | 504 | DEADLINE_EXCEEDED |
not_found | 404 | NOT_FOUND |
already_exists | 409 | ALREADY_EXISTS |
permission_denied | 403 | PERMISSION_DENIED |
resource_exhausted | 429 | RESOURCE_EXHAUSTED |
failed_precondition | 400 | FAILED_PRECONDITION |
aborted | 409 | ABORTED |
out_of_range | 400 | OUT_OF_RANGE |
unimplemented | 501 | UNIMPLEMENTED |
internal | 500 | INTERNAL |
unavailable | 503 | UNAVAILABLE |
data_loss | 500 | DATA_LOSS |
unauthenticated | 401 | UNAUTHENTICATED |
13.6 拦截器
package main
import (
"context"
"log"
"time"
"connectrpc.com/connect"
)
// 日志拦截器
func loggingInterceptor() connect.UnaryInterceptorFunc {
return func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
start := time.Now()
log.Printf("[Connect] %s %s", req.Spec().Procedure, "START")
resp, err := next(ctx, req)
log.Printf("[Connect] %s duration=%v err=%v",
req.Spec().Procedure, time.Since(start), err)
return resp, err
}
}
}
// 认证拦截器
func authInterceptor() connect.UnaryInterceptorFunc {
return func(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
// 从 HTTP Header 获取 token
token := req.Header().Get("Authorization")
if token == "" {
return nil, connect.NewError(connect.CodeUnauthenticated,
fmt.Errorf("缺少认证信息"))
}
// 验证 token
if err := validateToken(token); err != nil {
return nil, connect.NewError(connect.CodeUnauthenticated, err)
}
return next(ctx, req)
}
}
}
// 使用拦截器
func main() {
mux := http.NewServeMux()
// 服务端拦截器
path, handler := userv1connect.NewUserServiceHandler(
newUserServer(),
connect.WithInterceptors(loggingInterceptor(), authInterceptor()),
)
mux.Handle(path, handler)
// 客户端拦截器
client := userv1connect.NewUserServiceClient(
http.DefaultClient,
"http://localhost:8080",
connect.WithInterceptors(loggingInterceptor()),
)
_ = client
}
13.7 何时选择 Connect
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 新项目,需要 Web 支持 | Connect | 原生浏览器支持 |
| 新项目,纯后端微服务 | gRPC 或 Connect | 性能相近,gRPC 生态更成熟 |
| 已有 gRPC 服务,需加 Web | Connect | 兼容现有服务 |
| 需要 curl 调试 | Connect | JSON + HTTP/1.1 |
| 追求最大生态兼容 | gRPC | 社区最大 |
13.8 注意事项
⚠️ Connect 的局限:
- 相对较新(2022 年),生态不如 gRPC 成熟
- 语言支持目前主要是 Go 和 TypeScript
- 某些 gRPC 高级功能可能不完全支持
⚠️ 协议协商:
- 服务器需同时支持多种协议(Connect / gRPC / gRPC-Web)
- 通过 Content-Type 和 Connect-Protocol-Version 头区分协议
💡 最佳实践:
- 新项目优先考虑 Connect
- 使用 Buf 工具链管理 Protobuf
- 前后端分离项目使用 Connect 的 JSON 模式
- 微服务间使用 Connect 的 Protobuf 模式