函数式编程艺术 / 12 函数式响应式编程
12 函数式响应式编程
“FRP 把事件流当作一等值来处理——你可以像操作数组一样操作事件。”
12.1 概述
函数式响应式编程(Functional Reactive Programming, FRP) 将时间维度引入函数式编程,用流(Stream)和信号(Signal)来表示随时间变化的值。
12.1.1 命令式 vs 响应式
| 方面 | 命令式 | 响应式 |
|---|
| 数据流 | 手动更新 | 自动传播 |
| 状态管理 | 可变变量 | 不可变流 |
| 事件处理 | 回调函数 | 声明式流 |
| 并发 | 手动管理 | 操作符处理 |
12.1.2 FRP 的两大学派
| 学派 | 代表 | 特点 |
|---|
| 经典 FRP | Fran, Elm (早期) | 连续时间信号/行为 |
| 反应式编程 | RxJS, Reactor | 离散事件流 |
12.2 信号与行为(Signals/Behaviors)
12.2.1 经典 FRP 模型
行为(Behavior) 是随时间连续变化的值:
Behavior a = Time → a
事件(Event) 是离散发生的值:
Event a = [(Time, a)]
Elm 风格:
-- 行为:鼠标位置随时间变化
mousePos : Behavior (Int, Int)
mousePos = Mouse.position
-- 事件:鼠标点击
clicks : Event ()
clicks = Mouse.clicks
-- 组合:点击时的鼠标位置
clickPositions : Event (Int, Int)
clickPositions = sampleOn clicks mousePos
-- 转换
countClicks : Behavior Int
countClicks = foldp (\_ acc -> acc + 1) 0 clicks
12.3 响应式流(Reactive Streams)
12.3.1 Observable 模型
Observable 是最流行的响应式流实现。
核心概念:
| 概念 | 说明 |
|---|
| Observable | 可观察的数据流,发出多个值 |
| Observer | 消费 Observable 发出的值 |
| Subscription | 连接 Observable 和 Observer |
| Operator | 变换、组合流的函数 |
12.3.2 RxJS 示例
import { fromEvent, interval, merge, combineLatest } from 'rxjs';
import { map, filter, scan, debounceTime, switchMap, takeUntil } from 'rxjs/operators';
// 基本流
const numbers$ = from([1, 2, 3, 4, 5]);
// 事件流
const clicks$ = fromEvent(document, 'click');
const keys$ = fromEvent(document, 'keydown');
// 变换
const positions$ = clicks$.pipe(
map(e => ({ x: e.clientX, y: e.clientY }))
);
// 过滤
const enterKeys$ = keys$.pipe(
filter(e => e.key === 'Enter')
);
// 累积
const clickCount$ = clicks$.pipe(
scan((count, _) => count + 1, 0)
);
// 防抖
const searchInput$ = fromEvent(searchBox, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
filter(query => query.length > 2)
);
// 搜索请求(取消前一个)
const results$ = searchInput$.pipe(
switchMap(query => fetch(`/api/search?q=${query}`).then(r => r.json()))
);
// 订阅
results$.subscribe(results => {
renderResults(results);
});
12.3.3 自动补全示例
// 经典应用:搜索自动补全
const searchBox = document.getElementById('search');
const results = document.getElementById('results');
const search$ = fromEvent(searchBox, 'input').pipe(
// 取输入值
map(e => e.target.value.trim()),
// 防抖 300ms
debounceTime(300),
// 至少 2 个字符
filter(query => query.length >= 2),
// 去重
distinctUntilChanged(),
// 发起请求(取消前一个未完成的)
switchMap(query =>
from(fetch(`/api/search?q=${encodeURIComponent(query)}`).then(r => r.json())).pipe(
// 错误处理:返回空结果
catchError(() => of([]))
)
)
);
search$.subscribe(data => {
results.innerHTML = data.map(item =>
`<div class="result">${item.title}</div>`
).join('');
});
12.4 流操作符
12.4.1 创建操作符
| 操作符 | 说明 | RxJS |
|---|
of | 从值创建 | of(1, 2, 3) |
from | 从数组/可迭代创建 | from([1, 2, 3]) |
interval | 定时创建 | interval(1000) |
fromEvent | 从事件创建 | fromEvent(el, 'click') |
create | 自定义创建 | new Observable(subscriber => {...}) |
12.4.2 变换操作符
| 操作符 | 说明 |
|---|
map | 转换每个值 |
flatMap / mergeMap | 展平内部 Observable |
switchMap | 切换到新的内部 Observable(取消旧的) |
concatMap | 顺序执行内部 Observable |
scan | 累积归约 |
buffer | 收集值到数组 |
12.4.3 过滤操作符
| 操作符 | 说明 |
|---|
filter | 根据谓词过滤 |
take | 取前 N 个值 |
skip | 跳过前 N 个值 |
distinctUntilChanged | 去重连续相同值 |
debounceTime | 防抖 |
throttleTime | 节流 |
first / last | 取第一个/最后一个 |
12.4.4 组合操作符
| 操作符 | 说明 |
|---|
merge | 合并多个流 |
concat | 顺序连接流 |
combineLatest | 最新值组合 |
zip | 按索引配对 |
forkJoin | 等待所有完成 |
import { merge, combineLatest, zip } from 'rxjs';
// merge:任一流发出值都传播
const merged$ = merge(mouseClicks$, keyPresses$);
// combineLatest:任一流发出时,取其他流的最新值
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
map(([first, last]) => `${first} ${last}`)
);
// zip:严格配对
const paired$ = zip(names$, ages$).pipe(
map(([name, age]) => ({ name, age }))
);
12.5 错误处理
import { of, EMPTY } from 'rxjs';
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';
// 基本错误处理
const safeRequest$ = fetch('/api/data').pipe(
catchError(error => {
console.error('Request failed:', error);
return of({ data: [], error: error.message });
})
);
// 重试
const resilientRequest$ = fetch('/api/data').pipe(
retry(3)
);
// 指数退避重试
const backoffRequest$ = fetch('/api/data').pipe(
retryWhen(errors => errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) throw error;
return retryCount + 1;
}, 0),
delay(retryCount => Math.pow(2, retryCount) * 1000)
))
);
// 错误恢复
const requestWithFallback$ = primaryRequest$.pipe(
catchError(() => fallbackRequest$),
catchError(() => of(defaultValue))
);
12.6 状态管理
12.6.1 Redux 的流视角
// Redux 本质上是一个流
import { Subject } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';
const createStore = (reducer, initialState) => {
const actions$ = new Subject();
const state$ = actions$.pipe(
scan(reducer, initialState),
shareReplay(1)
);
return {
dispatch: (action) => actions$.next(action),
select: (selector) => state$.pipe(map(selector)),
state$,
};
};
// 使用
const store = createStore(todoReducer, []);
store.select(state => state.filter(t => !t.completed)).subscribe(
activeTodos => render(activeTodos)
);
store.dispatch({ type: 'ADD_TODO', text: 'Learn FRP' });
12.6.2 Elm Architecture
// Elm Architecture: Model → Update → View
const ElmApp = (init, update, view) => {
const actions$ = new Subject();
const model$ = actions$.pipe(
scan((model, action) => update(model, action), init)
);
const vdom$ = model$.pipe(
map(model => view(model, (action) => actions$.next(action)))
);
return { model$, vdom$ };
};
12.7 Haskell 中的 FRP
-- 使用 reflex 库
import Reflex
-- 基本 FRP
example :: Widget t m => m ()
example = do
-- 创建事件
rec textInput <- textInput def
let keypressEvent = _textInput_keypress textInput
-- 行为:累积按键次数
count <- foldDyn (+) 0 (1 <$ keypressEvent)
-- 显示
display count
-- 动态文本
dynText $ fmap show count
12.8 业务场景
12.8.1 实时仪表板
// 实时数据仪表板
const dashboard$ = combineLatest({
cpu: interval(1000).pipe(switchMap(() => fetchMetric('cpu'))),
memory: interval(2000).pipe(switchMap(() => fetchMetric('memory'))),
requests: fromEventSource('/api/metrics/requests'),
errors: fromEventSource('/api/metrics/errors').pipe(
bufferTime(5000),
filter(errors => errors.length > 0)
)
}).pipe(
// 状态转换
scan((dashboard, metrics) => ({
...dashboard,
...metrics,
lastUpdated: Date.now(),
alerts: [
...dashboard.alerts,
...(metrics.cpu > 90 ? ['High CPU'] : []),
...(metrics.errors.length > 10 ? ['Error spike'] : []),
]
}), { alerts: [] })
);
dashboard$.subscribe(renderDashboard);
12.8.2 表单验证
const formValidation$ = (form) => {
const name$ = fromEvent(form.name, 'input').pipe(
map(e => e.target.value),
map(name => ({
value: name,
valid: name.length >= 2,
error: name.length < 2 ? 'Name too short' : null
}))
);
const email$ = fromEvent(form.email, 'input').pipe(
map(e => e.target.value),
map(email => ({
value: email,
valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email),
error: !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email) ? 'Invalid email' : null
}))
);
const age$ = fromEvent(form.age, 'input').pipe(
map(e => parseInt(e.target.value)),
map(age => ({
value: age,
valid: age >= 18 && age <= 120,
error: age < 18 ? 'Must be 18+' : age > 120 ? 'Invalid age' : null
}))
);
return combineLatest([name$, email$, age$]).pipe(
map(([name, email, age]) => ({
fields: { name, email, age },
valid: name.valid && email.valid && age.valid
})),
distinctUntilChanged((a, b) => a.valid === b.valid)
);
};
12.9 注意事项
| 注意事项 | 说明 |
|---|
| 内存泄漏 | 忘记取消订阅会导致内存泄漏 |
| 调试困难 | 流的异步性质使调试复杂 |
| 过度使用 | 简单场景不需要 FRP |
| 背压处理 | 高速数据流需要背压策略 |
| 测试 | 使用 TestScheduler 进行虚拟时间测试 |
12.10 小结
| 要点 | 说明 |
|---|
| FRP | 函数式 + 响应式,流是一等公民 |
| Signal/Behavior | 连续时间变化的值 |
| Observable | 离散事件流,支持丰富的操作符 |
| 操作符 | 创建、变换、过滤、组合、错误处理 |
| 状态管理 | Elm Architecture、Redux 的流视角 |
扩展阅读
- RxJS 文档 — 官方文档
- The Introduction to Reactive Programming You’ve Been Missing — André Staltz
- Elm Architecture — Elm 官方教程
- Reflex FRP — Haskell FRP 库
下一章:13 函数式设计模式