强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

函数式编程艺术 / 12 函数式响应式编程

12 函数式响应式编程

“FRP 把事件流当作一等值来处理——你可以像操作数组一样操作事件。”


12.1 概述

函数式响应式编程(Functional Reactive Programming, FRP) 将时间维度引入函数式编程,用流(Stream)和信号(Signal)来表示随时间变化的值。

12.1.1 命令式 vs 响应式

方面命令式响应式
数据流手动更新自动传播
状态管理可变变量不可变流
事件处理回调函数声明式流
并发手动管理操作符处理

12.1.2 FRP 的两大学派

学派代表特点
经典 FRPFran, Elm (早期)连续时间信号/行为
反应式编程RxJS, Reactor离散事件流

12.2 信号与行为(Signals/Behaviors)

12.2.1 经典 FRP 模型

行为(Behavior) 是随时间连续变化的值:

Behavior a = Time → a

事件(Event) 是离散发生的值:

Event a = [(Time, a)]

Elm 风格:

-- 行为:鼠标位置随时间变化
mousePos : Behavior (Int, Int)
mousePos = Mouse.position

-- 事件:鼠标点击
clicks : Event ()
clicks = Mouse.clicks

-- 组合:点击时的鼠标位置
clickPositions : Event (Int, Int)
clickPositions = sampleOn clicks mousePos

-- 转换
countClicks : Behavior Int
countClicks = foldp (\_ acc -> acc + 1) 0 clicks

12.3 响应式流(Reactive Streams)

12.3.1 Observable 模型

Observable 是最流行的响应式流实现。

核心概念:

概念说明
Observable可观察的数据流,发出多个值
Observer消费 Observable 发出的值
Subscription连接 Observable 和 Observer
Operator变换、组合流的函数

12.3.2 RxJS 示例

import { fromEvent, interval, merge, combineLatest } from 'rxjs';
import { map, filter, scan, debounceTime, switchMap, takeUntil } from 'rxjs/operators';

// 基本流
const numbers$ = from([1, 2, 3, 4, 5]);

// 事件流
const clicks$ = fromEvent(document, 'click');
const keys$ = fromEvent(document, 'keydown');

// 变换
const positions$ = clicks$.pipe(
  map(e => ({ x: e.clientX, y: e.clientY }))
);

// 过滤
const enterKeys$ = keys$.pipe(
  filter(e => e.key === 'Enter')
);

// 累积
const clickCount$ = clicks$.pipe(
  scan((count, _) => count + 1, 0)
);

// 防抖
const searchInput$ = fromEvent(searchBox, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  filter(query => query.length > 2)
);

// 搜索请求(取消前一个)
const results$ = searchInput$.pipe(
  switchMap(query => fetch(`/api/search?q=${query}`).then(r => r.json()))
);

// 订阅
results$.subscribe(results => {
  renderResults(results);
});

12.3.3 自动补全示例

// 经典应用:搜索自动补全
const searchBox = document.getElementById('search');
const results = document.getElementById('results');

const search$ = fromEvent(searchBox, 'input').pipe(
  // 取输入值
  map(e => e.target.value.trim()),
  // 防抖 300ms
  debounceTime(300),
  // 至少 2 个字符
  filter(query => query.length >= 2),
  // 去重
  distinctUntilChanged(),
  // 发起请求(取消前一个未完成的)
  switchMap(query =>
    from(fetch(`/api/search?q=${encodeURIComponent(query)}`).then(r => r.json())).pipe(
      // 错误处理:返回空结果
      catchError(() => of([]))
    )
  )
);

search$.subscribe(data => {
  results.innerHTML = data.map(item =>
    `<div class="result">${item.title}</div>`
  ).join('');
});

12.4 流操作符

12.4.1 创建操作符

操作符说明RxJS
of从值创建of(1, 2, 3)
from从数组/可迭代创建from([1, 2, 3])
interval定时创建interval(1000)
fromEvent从事件创建fromEvent(el, 'click')
create自定义创建new Observable(subscriber => {...})

12.4.2 变换操作符

操作符说明
map转换每个值
flatMap / mergeMap展平内部 Observable
switchMap切换到新的内部 Observable(取消旧的)
concatMap顺序执行内部 Observable
scan累积归约
buffer收集值到数组

12.4.3 过滤操作符

操作符说明
filter根据谓词过滤
take取前 N 个值
skip跳过前 N 个值
distinctUntilChanged去重连续相同值
debounceTime防抖
throttleTime节流
first / last取第一个/最后一个

12.4.4 组合操作符

操作符说明
merge合并多个流
concat顺序连接流
combineLatest最新值组合
zip按索引配对
forkJoin等待所有完成
import { merge, combineLatest, zip } from 'rxjs';

// merge:任一流发出值都传播
const merged$ = merge(mouseClicks$, keyPresses$);

// combineLatest:任一流发出时,取其他流的最新值
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
  map(([first, last]) => `${first} ${last}`)
);

// zip:严格配对
const paired$ = zip(names$, ages$).pipe(
  map(([name, age]) => ({ name, age }))
);

12.5 错误处理

import { of, EMPTY } from 'rxjs';
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';

// 基本错误处理
const safeRequest$ = fetch('/api/data').pipe(
  catchError(error => {
    console.error('Request failed:', error);
    return of({ data: [], error: error.message });
  })
);

// 重试
const resilientRequest$ = fetch('/api/data').pipe(
  retry(3)
);

// 指数退避重试
const backoffRequest$ = fetch('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, error) => {
      if (retryCount >= 3) throw error;
      return retryCount + 1;
    }, 0),
    delay(retryCount => Math.pow(2, retryCount) * 1000)
  ))
);

// 错误恢复
const requestWithFallback$ = primaryRequest$.pipe(
  catchError(() => fallbackRequest$),
  catchError(() => of(defaultValue))
);

12.6 状态管理

12.6.1 Redux 的流视角

// Redux 本质上是一个流
import { Subject } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';

const createStore = (reducer, initialState) => {
  const actions$ = new Subject();
  const state$ = actions$.pipe(
    scan(reducer, initialState),
    shareReplay(1)
  );
  return {
    dispatch: (action) => actions$.next(action),
    select: (selector) => state$.pipe(map(selector)),
    state$,
  };
};

// 使用
const store = createStore(todoReducer, []);

store.select(state => state.filter(t => !t.completed)).subscribe(
  activeTodos => render(activeTodos)
);

store.dispatch({ type: 'ADD_TODO', text: 'Learn FRP' });

12.6.2 Elm Architecture

// Elm Architecture: Model → Update → View
const ElmApp = (init, update, view) => {
  const actions$ = new Subject();

  const model$ = actions$.pipe(
    scan((model, action) => update(model, action), init)
  );

  const vdom$ = model$.pipe(
    map(model => view(model, (action) => actions$.next(action)))
  );

  return { model$, vdom$ };
};

12.7 Haskell 中的 FRP

-- 使用 reflex 库
import Reflex

-- 基本 FRP
example :: Widget t m => m ()
example = do
  -- 创建事件
  rec textInput <- textInput def
      let keypressEvent = _textInput_keypress textInput

  -- 行为:累积按键次数
  count <- foldDyn (+) 0 (1 <$ keypressEvent)

  -- 显示
  display count

-- 动态文本
  dynText $ fmap show count

12.8 业务场景

12.8.1 实时仪表板

// 实时数据仪表板
const dashboard$ = combineLatest({
  cpu: interval(1000).pipe(switchMap(() => fetchMetric('cpu'))),
  memory: interval(2000).pipe(switchMap(() => fetchMetric('memory'))),
  requests: fromEventSource('/api/metrics/requests'),
  errors: fromEventSource('/api/metrics/errors').pipe(
    bufferTime(5000),
    filter(errors => errors.length > 0)
  )
}).pipe(
  // 状态转换
  scan((dashboard, metrics) => ({
    ...dashboard,
    ...metrics,
    lastUpdated: Date.now(),
    alerts: [
      ...dashboard.alerts,
      ...(metrics.cpu > 90 ? ['High CPU'] : []),
      ...(metrics.errors.length > 10 ? ['Error spike'] : []),
    ]
  }), { alerts: [] })
);

dashboard$.subscribe(renderDashboard);

12.8.2 表单验证

const formValidation$ = (form) => {
  const name$ = fromEvent(form.name, 'input').pipe(
    map(e => e.target.value),
    map(name => ({
      value: name,
      valid: name.length >= 2,
      error: name.length < 2 ? 'Name too short' : null
    }))
  );

  const email$ = fromEvent(form.email, 'input').pipe(
    map(e => e.target.value),
    map(email => ({
      value: email,
      valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email),
      error: !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email) ? 'Invalid email' : null
    }))
  );

  const age$ = fromEvent(form.age, 'input').pipe(
    map(e => parseInt(e.target.value)),
    map(age => ({
      value: age,
      valid: age >= 18 && age <= 120,
      error: age < 18 ? 'Must be 18+' : age > 120 ? 'Invalid age' : null
    }))
  );

  return combineLatest([name$, email$, age$]).pipe(
    map(([name, email, age]) => ({
      fields: { name, email, age },
      valid: name.valid && email.valid && age.valid
    })),
    distinctUntilChanged((a, b) => a.valid === b.valid)
  );
};

12.9 注意事项

注意事项说明
内存泄漏忘记取消订阅会导致内存泄漏
调试困难流的异步性质使调试复杂
过度使用简单场景不需要 FRP
背压处理高速数据流需要背压策略
测试使用 TestScheduler 进行虚拟时间测试

12.10 小结

要点说明
FRP函数式 + 响应式,流是一等公民
Signal/Behavior连续时间变化的值
Observable离散事件流,支持丰富的操作符
操作符创建、变换、过滤、组合、错误处理
状态管理Elm Architecture、Redux 的流视角

扩展阅读

  1. RxJS 文档 — 官方文档
  2. The Introduction to Reactive Programming You’ve Been Missing — André Staltz
  3. Elm Architecture — Elm 官方教程
  4. Reflex FRP — Haskell FRP 库

下一章13 函数式设计模式