强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

OpenResty 高性能网关开发教程 / 第 13 章 - 微服务网关架构

第 13 章 - 微服务网关架构

13.1 微服务网关全景

                    ┌──────────────────────────────────┐
                    │           API Gateway            │
                    │  ┌────────┬────────┬──────────┐  │
                    │  │ 路由   │ 认证   │ 限流     │  │
                    │  │ 服务发现│ 配置中心│ 链路追踪 │  │
                    │  └────────┴────────┴──────────┘  │
                    └──────────────┬───────────────────┘
                                  │
            ┌─────────────────────┼─────────────────────┐
            │                     │                     │
    ┌───────▼───────┐   ┌────────▼────────┐   ┌───────▼───────┐
    │  用户服务     │   │   订单服务      │   │   商品服务    │
    │  (user-svc)   │   │  (order-svc)    │   │ (product-svc) │
    └───────────────┘   └─────────────────┘   └───────────────┘
            │                     │                     │
    ┌───────▼───────┐   ┌────────▼────────┐   ┌───────▼───────┐
    │  Consul/Nacos │   │   Jaeger        │   │  Prometheus   │
    │  服务发现     │   │   链路追踪      │   │  监控告警     │
    └───────────────┘   └─────────────────┘   └───────────────┘

13.2 服务发现

13.2.1 Consul 集成

Consul 是 HashiCorp 开发的服务发现和配置管理工具。

-- /usr/local/openresty/lua/discovery/consul.lua
local _M = {}

local http = require "resty.http"
local cjson = require "cjson"

local config = {
    consul_addr = os.getenv("CONSUL_ADDR") or "http://127.0.0.1:8500",
    cache_ttl = 10,       -- 服务缓存 TTL(秒)
    watch_interval = 5,   -- 健康检查间隔
}

-- 服务实例缓存
local service_cache = ngx.shared.service_cache

-- 从 Consul 获取健康服务实例
function _M.get_service(service_name, tag)
    -- 检查缓存
    local cache_key = "svc:" .. service_name .. ":" .. (tag or "")
    local cached = service_cache:get(cache_key)
    if cached then
        return cjson.decode(cached)
    end

    -- 请求 Consul API
    local url = config.consul_addr .. "/v1/health/service/" .. service_name
        .. "?passing=true"  -- 只返回健康实例

    if tag then
        url = url .. "&tag=" .. tag
    end

    local httpc = http.new()
    httpc:set_timeout(3000)

    local res, err = httpc:request_uri(url, {method = "GET"})
    if not res then
        ngx.log(ngx.ERR, "Consul request failed: ", err)
        -- 降级:返回缓存(即使是过期的)
        local stale = service_cache:get("stale:" .. cache_key)
        if stale then
            return cjson.decode(stale)
        end
        return nil, err
    end

    local entries = cjson.decode(res.body)
    local instances = {}

    for _, entry in ipairs(entries) do
        local svc = entry.Service
        table.insert(instances, {
            id = svc.ID,
            address = svc.Address or entry.Node.Address,
            port = svc.Port,
            tags = svc.Tags or {},
            meta = svc.Meta or {},
            -- 健康检查信息
            checks = entry.Checks,
        })
    end

    -- 更新缓存
    service_cache:set(cache_key, cjson.encode(instances), config.cache_ttl)
    -- 保存过期缓存(降级使用)
    service_cache:set("stale:" .. cache_key, cjson.encode(instances), config.cache_ttl * 10)

    return instances
end

-- 服务选择(负载均衡)
function _M.select_instance(service_name, strategy)
    local instances, err = _M.get_service(service_name)
    if not instances or #instances == 0 then
        return nil, "No available instances for " .. service_name
    end

    strategy = strategy or "round_robin"

    if strategy == "round_robin" then
        -- 简单轮询
        local counter = service_cache:incr("counter:" .. service_name, 1, 0)
        local index = (counter % #instances) + 1
        return instances[index]

    elseif strategy == "random" then
        return instances[math.random(#instances)]

    elseif strategy == "consistent_hash" then
        local key = ngx.var.http_x_user_id or ngx.var.remote_addr
        local hash = 5381
        for i = 1, #key do
            hash = ((hash * 33) + string.byte(key, i)) % 2147483647
        end
        local index = (hash % #instances) + 1
        return instances[index]
    end

    return instances[1]
end

-- 注册本服务到 Consul
function _M.register(name, id, address, port, health_check)
    local body = cjson.encode({
        ID = id,
        Name = name,
        Address = address,
        Port = port,
        Check = health_check or {
            HTTP = "http://" .. address .. ":" .. port .. "/health",
            Interval = "10s",
            Timeout = "3s",
        },
    })

    local httpc = http.new()
    httpc:set_timeout(3000)

    local res, err = httpc:request_uri(
        config.consul_addr .. "/v1/agent/service/register",
        {
            method = "PUT",
            body = body,
            headers = {["Content-Type"] = "application/json"},
        }
    )

    return res and res.status == 200, err
end

-- 服务发现定时刷新(后台任务)
function _M.start_watcher(services)
    local function refresh(premature)
        if premature then return end

        for _, svc_name in ipairs(services) do
            _M.get_service(svc_name)
        end
    end

    ngx.timer.every(config.watch_interval, refresh)
end

return _M

nginx 配置

lua_shared_dict service_cache 20m;

init_worker_by_lua_block {
    local discovery = require "discovery.consul"
    -- 启动服务发现定时刷新
    discovery.start_watcher({
        "user-service",
        "order-service",
        "product-service",
    })
}

server {
    listen 8080;

    location /api/users {
        access_by_lua_block {
            local discovery = require "discovery.consul"
            local instance = discovery.select_instance("user-service")
            if instance then
                ngx.var.backend_addr = instance.address .. ":" .. instance.port
            else
                ngx.status = 503
                ngx.say('{"error":"Service unavailable"}')
                return ngx.exit(503)
            end
        }
        proxy_pass http://$backend_addr;
    }
}

13.3 Nacos 集成

Nacos 是阿里巴巴开源的服务发现和配置管理平台,在国内广泛使用。

-- /usr/local/openresty/lua/discovery/nacos.lua
local _M = {}

local http = require "resty.http"
local cjson = require "cjson"

local config = {
    nacos_addr = os.getenv("NACOS_ADDR") or "http://127.0.0.1:8848",
    namespace = os.getenv("NACOS_NAMESPACE") or "public",
    group = "DEFAULT_GROUP",
}

function _M.get_instances(service_name)
    local url = config.nacos_addr
        .. "/nacos/v1/ns/instance/list"
        .. "?serviceName=" .. service_name
        .. "&namespaceId=" .. config.namespace
        .. "&groupName=" .. config.group
        .. "&healthyOnly=true"

    local httpc = http.new()
    httpc:set_timeout(3000)

    local res, err = httpc:request_uri(url, {method = "GET"})
    if not res then
        return nil, err
    end

    local data = cjson.decode(res.body)
    local instances = {}

    for _, host in ipairs(data.hosts or {}) do
        table.insert(instances, {
            address = host.ip,
            port = host.port,
            healthy = host.healthy,
            weight = host.weight or 100,
            metadata = host.metadata or {},
        })
    end

    return instances
end

-- Nacos 配置获取
function _M.get_config(data_id, group)
    local url = config.nacos_addr
        .. "/nacos/v1/cs/configs"
        .. "?dataId=" .. data_id
        .. "&group=" .. (group or config.group)
        .. "&tenant=" .. config.namespace

    local httpc = http.new()
    httpc:set_timeout(3000)

    local res, err = httpc:request_uri(url, {method = "GET"})
    if not res or res.status ~= 200 then
        return nil, err or "Config not found"
    end

    return res.body
end

-- 配置监听(长轮询)
function _M.watch_config(data_id, group, callback)
    local function poll(premature)
        if premature then return end

        local config_content = _M.get_config(data_id, group)
        if config_content then
            callback(config_content)
        end

        -- 继续轮询
        ngx.timer.at(5, poll)
    end

    ngx.timer.at(0, poll)
end

return _M

13.4 配置中心集成

-- /usr/local/openresty/lua/config_center.lua
local _M = {}

local cjson = require "cjson"

-- 配置源优先级:环境变量 > 配置中心 > 本地文件
local config_sources = {
    env = function(key)
        return os.getenv("GW_" .. key:upper():gsub("%.", "_"))
    end,

    shared = function(key)
        return ngx.shared.gateway_config:get("config:" .. key)
    end,

    consul = function(key)
        local consul = require "discovery.consul"
        -- Consul KV 存储
        local httpc = require "resty.http"
        local http = httpc.new()
        local res, err = http:request_uri(
            "http://127.0.0.1:8500/v1/kv/gateway/" .. key .. "?raw"
        )
        if res and res.status == 200 then
            return res.body
        end
        return nil
    end,

    file = function(key)
        local f = io.open("/etc/openresty/config.json", "r")
        if f then
            local content = f:read("*all")
            f:close()
            local config = cjson.decode(content)
            -- 支持点号分隔的嵌套 key
            local value = config
            for k in key:gmatch("[^%.]+") do
                if type(value) == "table" then
                    value = value[k]
                else
                    return nil
                end
            end
            return type(value) == "table" and cjson.encode(value) or tostring(value)
        end
        return nil
    end,
}

-- 获取配置值
function _M.get(key, default)
    for source_name, source_func in pairs(config_sources) do
        local ok, value = pcall(source_func, key)
        if ok and value ~= nil then
            return value
        end
    end
    return default
end

-- 获取配置(JSON 解析)
function _M.get_json(key, default)
    local value = _M.get(key)
    if value then
        local ok, decoded = pcall(cjson.decode, value)
        if ok then return decoded end
    end
    return default
end

-- 设置配置(写入共享内存)
function _M.set(key, value)
    ngx.shared.gateway_config:set("config:" .. key, tostring(value))
end

-- 热加载配置
function _M.reload()
    local consul = require "discovery.consul"
    local configs = {
        "rate_limit",
        "jwt_secret",
        "upstream_services",
    }

    for _, key in ipairs(configs) do
        local value = _M.get(key)
        if value then
            _M.set(key, value)
            ngx.log(ngx.INFO, "Config reloaded: ", key)
        end
    end
end

return _M

13.5 链路追踪

13.5.1 分布式追踪原理

客户端请求
    │
    ▼
┌─────────┐    Trace ID: abc123
│  网关    │    Span ID: 001
│          │    Duration: 150ms
└────┬─────┘
     │
     ├──────────────┐
     ▼              ▼
┌─────────┐   ┌─────────┐
│ 用户服务 │   │ 订单服务 │  Span ID: 002   Span ID: 003
│          │   │          │  Parent: 001    Parent: 001
└────┬─────┘   └────┬─────┘  Duration: 50ms Duration: 80ms
     │              │
     ▼              ▼
┌─────────┐   ┌─────────┐
│  数据库  │   │  数据库  │  Span ID: 004   Span ID: 005
│          │   │          │  Parent: 002    Parent: 003
└─────────┘   └─────────┘  Duration: 20ms  Duration: 30ms

13.5.2 OpenResty 链路追踪实现

-- /usr/local/openresty/lua/tracing/tracer.lua
local _M = {}

local cjson = require "cjson"

-- 生成唯一 ID
local function generate_id()
    -- 16 字节随机 ID(32 位十六进制)
    return string.format("%08x%08x",
        math.random(0, 0xFFFFFFFF),
        math.random(0, 0xFFFFFFFF))
end

-- 从请求头提取追踪上下文
local function extract_context()
    local headers = ngx.req.get_headers()

    -- W3C Trace Context 标准
    local traceparent = headers["traceparent"]
    if traceparent then
        local version, trace_id, span_id, flags =
            traceparent:match("^(%x%x)%-(%x+)%-(%x+)%-(%x%x)$")
        if trace_id then
            return {
                trace_id = trace_id,
                parent_span_id = span_id,
                flags = flags,
            }
        end
    end

    -- Jaeger 格式
    local uber_trace = headers["uber-trace-id"]
    if uber_trace then
        local trace_id, span_id, _, flags = uber_trace:match("^(%x+):(%x+):(%x+):(%x+)$")
        if trace_id then
            return {
                trace_id = trace_id,
                parent_span_id = span_id,
                flags = flags,
            }
        end
    end

    return nil
end

-- 初始化追踪上下文
function _M.start_span(operation_name)
    local ctx = extract_context()

    local span = {
        trace_id = ctx and ctx.trace_id or generate_id(),
        span_id = generate_id(),
        parent_span_id = ctx and ctx.parent_span_id or nil,
        operation_name = operation_name,
        start_time = ngx.now() * 1000000,  -- 微秒
        tags = {},
        logs = {},
    }

    -- 存储到请求上下文
    ngx.ctx.trace_span = span

    -- 设置请求头传递给下游
    local traceparent = string.format("00-%s-%s-%s",
        span.trace_id, span.span_id, "01")
    ngx.req.set_header("traceparent", traceparent)
    ngx.req.set_header("X-Trace-ID", span.trace_id)
    ngx.req.set_header("X-Span-ID", span.span_id)

    return span
end

-- 结束 Span
function _M.finish_span()
    local span = ngx.ctx.trace_span
    if not span then return end

    span.end_time = ngx.now() * 1000000
    span.duration = span.end_time - span.start_time

    -- 记录额外信息
    span.tags["http.method"] = ngx.req.get_method()
    span.tags["http.url"] = ngx.var.uri
    span.tags["http.status_code"] = ngx.status
    span.tags["http.client_ip"] = ngx.var.remote_addr

    -- 异步发送到追踪后端
    _M.report_span(span)
end

-- 添加标签
function _M.set_tag(key, value)
    local span = ngx.ctx.trace_span
    if span then
        span.tags[key] = value
    end
end

-- 添加日志
function _M.log(key, value)
    local span = ngx.ctx.trace_span
    if span then
        table.insert(span.logs, {
            timestamp = ngx.now() * 1000000,
            key = key,
            value = value,
        })
    end
end

-- 发送 Span 到 Jaeger
function _M.report_span(span)
    -- 使用 UDP 发送(Jaeger Agent 格式)
    local sock = ngx.socket.udp()
    local ok, err = sock:setpeername("jaeger-agent", 6831)
    if not ok then
        ngx.log(ngx.ERR, "Failed to connect to Jaeger agent: ", err)
        return
    end

    -- 构建 Jaeger Thrift 格式(简化版)
    local data = cjson.encode({
        traceId = span.trace_id,
        spanId = span.span_id,
        parentSpanId = span.parent_span_id,
        operationName = span.operation_name,
        startTime = span.start_time,
        duration = span.duration,
        tags = span.tags,
        logs = span.logs,
    })

    sock:send(data)
    sock:close()
end

return _M

nginx 配置

server {
    listen 8080;

    # 链路追踪中间件
    rewrite_by_lua_block {
        local tracer = require "tracing.tracer"
        tracer.start_span("gateway_request")
    }

    header_filter_by_lua_block {
        local tracer = require "tracing.tracer"
        --  Trace ID 返回给客户端
        local span = ngx.ctx.trace_span
        if span then
            ngx.header["X-Trace-ID"] = span.trace_id
        end
    }

    log_by_lua_block {
        local tracer = require "tracing.tracer"
        tracer.finish_span()
    }

    location /api/ {
        proxy_pass http://backend;
        # 追踪头已经通过 rewrite 阶段设置了
    }
}

13.6 可观测性三支柱

13.6.1 指标(Metrics)

-- Prometheus 指标(第 11 章已介绍)
-- 扩展微服务维度指标
local function service_metrics()
    local metrics = require "metrics"

    -- 按服务名统计
    metrics.counter_inc("gateway_service_requests_total", {
        service = ngx.var.upstream_service or "unknown",
        method = ngx.req.get_method(),
        status = tostring(ngx.status),
    })

    -- 按服务名统计延迟
    metrics.histogram_observe("gateway_service_duration_seconds",
        tonumber(ngx.var.upstream_response_time) or 0, {
        service = ngx.var.upstream_service or "unknown",
    })
end

13.6.2 日志(Logging)

-- 结构化日志(微服务关联)
local function service_log()
    local cjson = require "cjson"
    local log_entry = {
        timestamp = ngx.now(),
        trace_id = ngx.ctx.trace_span and ngx.ctx.trace_span.trace_id or "",
        span_id = ngx.ctx.trace_span and ngx.ctx.trace_span.span_id or "",
        service = ngx.var.upstream_service or "",
        method = ngx.req.get_method(),
        uri = ngx.var.uri,
        status = ngx.status,
        latency = ngx.var.request_time,
    }
    -- 发送到 ELK 或 Loki
end

13.6.3 追踪(Tracing)

已在 13.5 节详细介绍。

13.7 注意事项

服务缓存一致性:服务发现的缓存 TTL 不宜过长(建议 10-30 秒),避免路由到已下线的实例。

Consul/Nacos 故障:当配置中心不可用时,网关应能使用缓存的服务列表降级运行。

链路采样:高 QPS 场景下全量追踪会产生大量数据,建议使用采样策略(如 10% 请求采样)。

上下文传播:确保追踪上下文(Trace ID、Span ID)在所有异步调用中正确传播。


上一章← 第 12 章 - 安全防护 下一章第 14 章 - Docker 与容器化部署 →