强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

函数式编程艺术 / 17 实际应用

17 实际应用

“函数式编程不是象牙塔——它在数据处理、编译器、DSL 设计等实际场景中有广泛应用。”


17.1 数据处理与 ETL

17.1.1 函数式 ETL 管道

ETL(Extract-Transform-Load)是函数式编程最自然的应用场景。

// ETL 管道:CSV → 清洗 → 转换 → 聚合 → 输出
const R = require('ramda');

// 提取:读取 CSV
const extract = (csvText) =>
  csvText
    .split('\n')
    .filter(line => line.trim())
    .map(line => line.split(','))
    .slice(1)  // 跳过表头
    .map(([date, product, quantity, price]) => ({
      date,
      product: product.trim(),
      quantity: parseInt(quantity),
      price: parseFloat(price),
    }));

// 转换:清洗和计算
const transform = R.pipe(
  // 过滤无效记录
  R.filter(row => !isNaN(row.quantity) && !isNaN(row.price) && row.quantity > 0),
  // 计算小计
  R.map(row => ({ ...row, subtotal: row.quantity * row.price })),
  // 按产品分组
  R.groupBy(row => row.product),
  // 聚合统计
  R.map(rows => ({
    totalQuantity: R.sum(R.map(R.prop('quantity'), rows)),
    totalRevenue: R.sum(R.map(R.prop('subtotal'), rows)),
    avgPrice: R.mean(R.map(R.prop('price'), rows)),
    orderCount: rows.length,
  }))
);

// 加载:格式化输出
const load = (data) =>
  Object.entries(data).map(([product, stats]) => ({
    product,
    ...stats,
    formattedRevenue: `$${stats.totalRevenue.toFixed(2)}`,
  }));

// 完整管道
const etl = R.pipe(extract, transform, load);

// 使用
const csvData = `date,product,quantity,price
2024-01-01,Laptop,2,999.99
2024-01-01,Mouse,5,29.99
2024-01-02,Laptop,1,999.99
2024-01-02,Keyboard,3,79.99`;

console.log(etl(csvData));

Python:

from functools import reduce
from itertools import groupby
from operator import itemgetter
import csv
from io import StringIO
from collections import defaultdict

# 提取
def extract(csv_text):
    reader = csv.DictReader(StringIO(csv_text))
    return [
        {
            'date': row['date'],
            'product': row['product'].strip(),
            'quantity': int(row['quantity']),
            'price': float(row['price']),
        }
        for row in reader
    ]

# 转换
def transform(records):
    # 过滤无效记录
    valid = [r for r in records if r['quantity'] > 0 and r['price'] > 0]
    # 计算小计
    enriched = [{**r, 'subtotal': r['quantity'] * r['price']} for r in valid]
    # 分组聚合
    groups = defaultdict(list)
    for r in enriched:
        groups[r['product']].append(r)

    return {
        product: {
            'total_quantity': sum(r['quantity'] for r in rows),
            'total_revenue': sum(r['subtotal'] for r in rows),
            'avg_price': sum(r['price'] for r in rows) / len(rows),
            'order_count': len(rows),
        }
        for product, rows in groups.items()
    }

# 加载
def load(data):
    return [
        {'product': product, **stats, 'formatted_revenue': f"${stats['total_revenue']:.2f}"}
        for product, stats in data.items()
    ]

# 管道
from toolz import pipe
result = pipe(csv_data, extract, transform, load)

17.1.2 大数据处理

# 使用 itertools 处理大数据文件
import itertools
from functools import reduce

def process_large_file(filepath):
    """惰性处理大型日志文件"""
    def read_lines():
        with open(filepath, 'r') as f:
            for line in f:
                yield line.strip()

    def parse(line):
        parts = line.split(' ', 3)
        return {'timestamp': parts[0], 'level': parts[1],
                'module': parts[2], 'message': parts[3]} if len(parts) >= 4 else None

    def is_error(entry):
        return entry and entry['level'] == 'ERROR'

    # 惰性管道
    errors = itertools.islice(
        filter(is_error, map(parse, read_lines())),
        1000  # 只处理前 1000 个错误
    )

    # 聚合
    return reduce(
        lambda acc, e: {**acc, e['module']: acc.get(e['module'], 0) + 1},
        errors,
        {}
    )

17.2 解析器组合子(Parser Combinators)

解析器组合子是函数式编程的经典应用——用小的解析器组合成大的解析器。

17.2.1 基本概念

概念说明
解析器String → [(a, String)](输入 → [(结果, 剩余输入)])
组合子将小解析器组合成大解析器的函数
成功返回结果和剩余输入
失败返回空列表

17.2.2 Haskell 解析器组合子

import Text.Parsec
import Text.Parsec.String

-- 基本解析器
char' :: Char -> Parser Char
char' c = satisfy (== c)

digit :: Parser Char
digit = satisfy isDigit

-- 组合
string' :: String -> Parser String
string' = mapM char'

-- JSON 解析器
data JsonValue
  = JsonNull
  | JsonBool Bool
  | JsonNumber Double
  | JsonString String
  | JsonArray [JsonValue]
  | JsonObject [(String, JsonValue)]
  deriving (Show, Eq)

jsonParser :: Parser JsonValue
jsonParser = spaces *> (jsonNull <|> jsonBool <|> jsonNumber <|> jsonString <|> jsonArray <|> jsonObject)

jsonNull :: Parser JsonValue
jsonNull = string "null" *> pure JsonNull

jsonBool :: Parser JsonValue
jsonBool = (string "true" *> pure (JsonBool True))
       <|> (string "false" *> pure (JsonBool False))

jsonNumber :: Parser JsonValue
jsonNumber = JsonNumber . read <$> many1 (digit <|> oneOf ".-eE")

jsonString :: Parser JsonValue
jsonString = JsonString <$> (char '"' *> manyTill anyChar (char '"'))

jsonArray :: Parser JsonValue
jsonArray = JsonArray <$> (char '[' *> spaces *> jsonParser `sepBy` (char ',' <* spaces) <* char ']')

jsonObject :: Parser JsonValue
jsonObject = JsonObject <$> (char '{' *> spaces *> pair `sepBy` (char ',' <* spaces) <* char '}')
  where
    pair = do
      key <- jsonString >>= \(JsonString s) -> return s
      spaces *> char ':' *> spaces
      value <- jsonParser
      return (key, value)

17.2.3 JavaScript 解析器组合子

// 简单的解析器组合子库
const Parser = (run) => ({
  run,
  map: (fn) => Parser(input => {
    const result = run(input);
    return result ? { value: fn(result.value), rest: result.rest } : null;
  }),
  flatMap: (fn) => Parser(input => {
    const result = run(input);
    return result ? fn(result.value).run(result.rest) : null;
  }),
  then: (other) => Parser(input => {
    const result = run(input);
    return result ? other.run(result.rest) : null;
  }),
});

// 基本解析器
const char = (c) => Parser(input =>
  input[0] === c ? { value: c, rest: input.slice(1) } : null
);

const digit = Parser(input =>
  /^[0-9]/.test(input) ? { value: input[0], rest: input.slice(1) } : null
);

const many = (parser) => Parser(input => {
  const results = [];
  let rest = input;
  while (true) {
    const result = parser.run(rest);
    if (!result) break;
    results.push(result.value);
    rest = result.rest;
  }
  return { value: results, rest };
});

const many1 = (parser) => parser.flatMap(first =>
  many(parser).map(rest => [first, ...rest])
);

const string = (s) => Parser(input =>
  input.startsWith(s) ? { value: s, rest: input.slice(s.length) } : null
);

// JSON 数字解析器
const numberParser = many1(digit).map(digits => parseInt(digits.join('')));

// 使用
numberParser.run('123abc');  // { value: 123, rest: 'abc' }

17.2.4 Rust nom 库

use nom::{
    IResult,
    bytes::complete::{tag, take_while1},
    character::complete::{digit1, multispace0},
    combinator::{map, opt},
    sequence::{delimited, tuple},
    branch::alt,
};

// 解析整数
fn parse_integer(input: &str) -> IResult<&str, i64> {
    let (input, sign) = opt(tag("-"))(input)?;
    let (input, digits) = digit1(input)?;
    let number: i64 = digits.parse().unwrap();
    Ok((input, if sign.is_some() { -number } else { number }))
}

// 解析字符串
fn parse_string(input: &str) -> IResult<&str, &str> {
    delimited(
        tag("\""),
        take_while1(|c| c != '"'),
        tag("\""),
    )(input)
}

// 组合:解析键值对
fn parse_key_value(input: &str) -> IResult<&str, (&str, i64)> {
    let (input, key) = parse_string(input)?;
    let (input, _) = tag(":")(input)?;
    let (input, _) = multispace0(input)?;
    let (input, value) = parse_integer(input)?;
    Ok((input, (key, value)))
}

17.3 DSL 设计

17.3.1 内部 DSL

// 查询 DSL
const query = QueryBuilder
  .select('name', 'email', 'age')
  .from('users')
  .where('age', '>', 18)
  .where('active', '=', true)
  .orderBy('name', 'ASC')
  .limit(10)
  .build();

// 更函数式的 DSL
const buildQuery = (options) => {
  const { select, from, where, orderBy, limit } = options;
  let sql = `SELECT ${select.join(', ')} FROM ${from}`;
  if (where.length > 0) {
    sql += ` WHERE ${where.map(([col, op, val]) =>
      `${col} ${op} ${typeof val === 'string' ? `'${val}'` : val}`
    ).join(' AND ')}`;
  }
  if (orderBy) sql += ` ORDER BY ${orderBy[0]} ${orderBy[1]}`;
  if (limit) sql += ` LIMIT ${limit}`;
  return sql;
};

// 使用
buildQuery({
  select: ['name', 'email', 'age'],
  from: 'users',
  where: [['age', '>', 18], ['active', '=', true]],
  orderBy: ['name', 'ASC'],
  limit: 10,
});
// SELECT name, email, age FROM users WHERE age > 18 AND active = true ORDER BY name ASC LIMIT 10

17.3.2 Haskell 外部 DSL

-- 用 Parsec 定义外部 DSL
-- 配置文件格式:
-- database {
--   host = "localhost"
--   port = 5432
--   name = "mydb"
-- }

data Config = Config
  { dbHost :: String
  , dbPort :: Int
  , dbName :: String
  } deriving (Show)

configParser :: Parser Config
configParser = do
  string "database" *> spaces *> char '{' *> spaces
  host <- string "host" *> spaces *> char '=' *> spaces *> quotedString <* spaces
  port <- string "port" *> spaces *> char '=' *> spaces *> int <* spaces
  name <- string "name" *> spaces *> char '=' *> spaces *> quotedString <* spaces
  char '}'
  return $ Config host port name
  where
    quotedString = char '"' *> manyTill anyChar (char '"')
    int = read <$> many1 digit

17.3.3 Clojure DSL

;; Clojure 的数据即代码特性非常适合 DSL
;; SQL DSL
(defn sql-select [& fields]
  {:select fields})

(defn sql-from [query table]
  (assoc query :from table))

(defn sql-where [query condition]
  (update query :where conj condition))

(defn sql-limit [query n]
  (assoc query :limit n))

;; 组合 DSL
(-> (sql-select :name :email :age)
    (sql-from :users)
    (sql-where [:age :> 18])
    (sql-where [:active := true])
    (sql-limit 10))

;; 结果
;; {:select [:name :email :age]
;;  :from :users
;;  :where [[:age :> 18] [:active := true]]
;;  :limit 10}

;; 宏实现更自然的 DSL
(defmacro select [& fields]
  `{:select ~(vec fields)})

(defmacro from [query table]
  `(assoc ~query :from ~table))

;; 使用
(-> (select :name :email)
    (from :users))

17.4 编译器/解释器

17.4.1 简单表达式语言

-- 表达式语言的编译器
data Expr
  = Num Double
  | Var String
  | Add Expr Expr
  | Mul Expr Expr
  | Let String Expr Expr
  deriving (Show, Eq)

-- 类型检查
typeCheck :: Expr -> Either String Type
typeCheck (Num _) = Right TNum
typeCheck (Var _) = Right TNum  -- 简化:所有变量都是数值
typeCheck (Add a b) = case (typeCheck a, typeCheck b) of
  (Right TNum, Right TNum) -> Right TNum
  _ -> Left "Type error in addition"
typeCheck (Mul a b) = case (typeCheck a, typeCheck b) of
  (Right TNum, Right TNum) -> Right TNum
  _ -> Left "Type error in multiplication"
typeCheck (Let _ expr body) = case typeCheck expr of
  Right _ -> typeCheck body
  Left err -> Left err

-- 求值
eval :: Map String Double -> Expr -> Double
eval _ (Num x) = x
eval env (Var name) = env Map.! name
eval env (Add a b) = eval env a + eval env b
eval env (Mul a b) = eval env a * eval env b
eval env (Let name expr body) =
  let value = eval env expr
      newEnv = Map.insert name value env
  in eval newEnv body

-- 编译到汇编(简化示例)
compile :: Expr -> [Instruction]
compile (Num x) = [Push x]
compile (Var name) = [Load name]
compile (Add a b) = compile a ++ compile b ++ [AddOp]
compile (Mul a b) = compile a ++ compile b ++ [MulOp]
compile (Let name expr body) =
  compile expr ++ [Store name] ++ compile body

Rust:

#[derive(Debug, Clone, PartialEq)]
enum Expr {
    Num(f64),
    Var(String),
    Add(Box<Expr>, Box<Expr>),
    Mul(Box<Expr>, Box<Expr>),
    Let(String, Box<Expr>, Box<Expr>),
}

impl Expr {
    fn eval(&self, env: &std::collections::HashMap<String, f64>) -> Result<f64, String> {
        match self {
            Expr::Num(x) => Ok(*x),
            Expr::Var(name) => env.get(name)
                .cloned()
                .ok_or_else(|| format!("Undefined variable: {}", name)),
            Expr::Add(a, b) => Ok(a.eval(env)? + b.eval(env)?),
            Expr::Mul(a, b) => Ok(a.eval(env)? * b.eval(env)?),
            Expr::Let(name, expr, body) => {
                let value = expr.eval(env)?;
                let mut new_env = env.clone();
                new_env.insert(name.clone(), value);
                body.eval(&new_env)
            }
        }
    }
}

17.5 Web API 设计

17.5.1 函数式 Web 框架

// 函数式中间件
const middleware = {
  cors: (handler) => (req) => {
    const response = handler(req);
    return {
      ...response,
      headers: {
        ...response.headers,
        'Access-Control-Allow-Origin': '*',
      },
    };
  },

  auth: (handler) => (req) => {
    if (!req.headers.authorization) {
      return { status: 401, body: { error: 'Unauthorized' } };
    }
    const user = verifyToken(req.headers.authorization);
    return handler({ ...req, user });
  },

  validate: (schema) => (handler) => (req) => {
    const errors = schema.validate(req.body);
    if (errors.length > 0) {
      return { status: 400, body: { errors } };
    }
    return handler(req);
  },
};

// 路由定义(函数组合)
const routes = {
  'GET /users': compose(
    middleware.cors,
    middleware.auth,
  )((req) => {
    const users = db.getUsers();
    return { status: 200, body: users };
  }),

  'POST /users': compose(
    middleware.cors,
    middleware.auth,
    middleware.validate(userSchema),
  )((req) => {
    const user = db.createUser(req.body);
    return { status: 201, body: user };
  }),
};

17.6 机器学习预处理

# 函数式数据预处理管道
from functools import reduce, partial
from toolz import pipe, curry, compose

@curry
def normalize_column(df, column):
    """Min-Max 归一化"""
    col = df[column]
    return df.assign(**{column: (col - col.min()) / (col.max() - col.min())})

@curry
def fill_missing(df, column, strategy='mean'):
    """填充缺失值"""
    fill_value = df[column].mean() if strategy == 'mean' else df[column].median()
    return df.assign(**{column: df[column].fillna(fill_value)})

@curry
def encode_categorical(df, column):
    """One-hot 编码"""
    dummies = pd.get_dummies(df[column], prefix=column)
    return pd.concat([df.drop(column, axis=1), dummies], axis=1)

@curry
def filter_outliers(df, column, n_std=3):
    """过滤异常值"""
    mean, std = df[column].mean(), df[column].std()
    return df[(df[column] >= mean - n_std * std) & (df[column] <= mean + n_std * std)]

# 组合预处理管道
preprocess = compose(
    normalize_column('age'),
    normalize_column('income'),
    fill_missing('age'),
    fill_missing('income'),
    encode_categorical('city'),
    filter_outliers('income'),
)

# 使用
clean_df = preprocess(raw_df)

17.7 业务场景

17.7.1 日志分析系统

# 大规模日志分析
import json
from datetime import datetime
from itertools import islice
from collections import Counter

def analyze_logs(log_stream, time_range=None, level='ERROR'):
    """惰性日志分析管道"""

    def parse_lines(stream):
        for line in stream:
            try:
                yield json.loads(line.strip())
            except json.JSONDecodeError:
                continue

    def filter_by_time(entries):
        if time_range is None:
            return entries
        start, end = time_range
        return (
            e for e in entries
            if start <= datetime.fromisoformat(e['timestamp']) <= end
        )

    def filter_by_level(entries, target_level):
        return (e for e in entries if e.get('level') == target_level)

    def extract_modules(entries):
        return (e.get('module', 'unknown') for e in entries)

    # 惰性管道
    entries = parse_lines(log_stream)
    entries = filter_by_time(entries)
    entries = filter_by_level(entries, level)
    modules = extract_modules(entries)

    # 计数(只消耗需要的元素)
    return Counter(islice(modules, 10000))

# 使用
with open('app.log') as f:
    error_counts = analyze_logs(f, level='ERROR')
    print(error_counts.most_common(10))

17.8 注意事项

注意事项说明
渐进采用在现有项目中逐步引入 FP 概念
团队培训确保团队理解 FP 基本概念
库选择选择成熟、有良好文档的 FP 库
性能测试在关键路径上进行性能测试
混合风格不要强求所有代码都是纯函数式

17.9 小结

要点说明
ETL/数据处理管道化、无副作用、易测试
解析器组合子用小组件构建复杂解析器
DSL 设计内部/外部 DSL,函数组合是关键
编译器模式匹配 + 递归处理 AST
Web API函数式中间件组合

扩展阅读

  1. Parsing in Haskell — Haskell 解析器资源
  2. Domain Specific Languages — Martin Fowler
  3. nom - Rust 解析器组合子库
  4. Clojure for Data Science — Henry Garner

下一章18 最佳实践