- forkJoin
- 签名:
forkJoin(...args, selector : function): Observable
- 签名:
- 当所有 observables 完成时,发出每个 observable 的最新值。
- 为什么使用
forkJoin? - 示例
- 示例 1: Observables 再不同的时间间隔后完成
- 示例 2: 发起任意多个请求
- 示例 3: 在外部处理错误
- 示例 4: 当某个内部 observable 报错时得到成功结果
- 示例 5: Angular 中的 forkJoin
- 为什么使用
- 其他资源
forkJoin
签名: forkJoin(...args, selector : function): Observable
当所有 observables 完成时,发出每个 observable 的最新值。
如果你想要多个 observables 按发出顺序相对应的值的组合,试试 zip!
如果内部 observable 不完成的话,forkJoin 永远不会发出值!
为什么使用 forkJoin?
当有一组 observables,但你只关心每个 observable 最后发出的值时,此操作符是最适合的。此操作符的一个常见用例是在页面加载(或其他事件)时你希望发起多个请求,并在所有请求都响应后再采取行动。它可能与 Promise.all 的使用方式类似。
注意,如果任意作用于 forkJoin 的内部 observable 报错的话,对于那些在内部 observable 上没有正确 catch 错误,从而导致完成的 observable,你将丢失它们的值 (参见示例 4)。如果你只关心所有内部 observables 是否成功完成的话,可以在外部捕获错误。
还需要注意的是如果 observable 发出的项多于一个的话,并且你只关心前一个发出的话,那么 forkJoin 并非正确的选择。在这种情况下,应该选择像 combineLatest 或 zip 这样的操作符。

示例
示例 1: Observables 再不同的时间间隔后完成
( StackBlitz |
jsBin |
jsFiddle )
import { delay, take } from 'rxjs/operators';import { forkJoin } from 'rxjs/observable/forkJoin';import { of } from 'rxjs/observable/of';import { interval } from 'rxjs/observable/interval';const myPromise = val =>new Promise(resolve =>setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000));/*当所有 observables 完成时,将每个 observable的最新值作为数组发出*/const example = forkJoin(// 立即发出 'Hello'of('Hello'),// 1秒后发出 'World'of('World').pipe(delay(1000)),// 1秒后发出0interval(1000).pipe(take(1)),// 以1秒的时间间隔发出0和1interval(1000).pipe(take(2)),// 5秒后解析 'Promise Resolved' 的 promisemyPromise('RESULT'));//输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]const subscribe = example.subscribe(val => console.log(val));
示例 2: 发起任意多个请求
( StackBlitz |
jsBin |
jsFiddle )
import { mergeMap } from 'rxjs/operators';import { forkJoin } from 'rxjs/observable/forkJoin';import { of } from 'rxjs/observable/of';const myPromise = val =>new Promise(resolve =>setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000));const source = of([1, 2, 3, 4, 5]);// 发出数组的全部5个结果const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));/*输出:["Promise Resolved: 1","Promise Resolved: 2","Promise Resolved: 3","Promise Resolved: 4","Promise Resolved: 5"]*/const subscribe = example.subscribe(val => console.log(val));
示例 3: 在外部处理错误
( StackBlitz |
jsBin |
jsFiddle )
import { delay, catchError } from 'rxjs/operators';import { forkJoin } from 'rxjs/observable/forkJoin';import { of } from 'rxjs/observable/of';import { _throw } from 'rxjs/observable/throw';/*当所有 observables 完成时,将每个 observable的最新值作为数组发出*/const example = forkJoin(// 立即发出 'Hello'of('Hello'),// 1秒后发出 'World'of('World').pipe(delay(1000)),// 抛出错误_throw('This will error')).pipe(catchError(error => of(error)));// 输出: 'This will Error'const subscribe = example.subscribe(val => console.log(val));
示例 4: 当某个内部 observable 报错时得到成功结果
( StackBlitz |
jsBin |
jsFiddle )
import { delay, catchError } from 'rxjs/operators';import { forkJoin } from 'rxjs/observable/forkJoin';import { of } from 'rxjs/observable/of';import { _throw } from 'rxjs/observable/throw';/*当所有 observables 完成时,将每个 observable的最新值作为数组发出*/const example = forkJoin(// 立即发出 'Hello'of('Hello'),// 1秒后发出 'World'of('World').pipe(delay(1000)),// 抛出错误_throw('This will error').pipe(catchError(error => of(error))));// 输出: ["Hello", "World", "This will error"]const subscribe = example.subscribe(val => console.log(val));
示例 5: Angular 中的 forkJoin
( plunker )
@Injectable()export class MyService {makeRequest(value: string, delayDuration: number) {// 模拟 http 请求return of(`Complete: ${value}`).pipe(delay(delayDuration));}}@Component({selector: 'my-app',template: `<div><h2>forkJoin Example</h2><ul><li> {{propOne}} </li><li> {{propTwo}} </li><li> {{propThree}} </li></ul></div>`,})export class App {public propOne: string;public propTwo: string;public propThree: string;constructor(private _myService: MyService) {}ngOnInit() {// 使用不同的延迟模拟3个请求forkJoin(this._myService.makeRequest('Request One', 2000),this._myService.makeRequest('Request Two', 1000)this._myService.makeRequest('Request Three', 3000)).subscribe(([res1, res2, res3]) => {this.propOne = res1;this.propTwo = res2;this.propThree = res3;});}}
其他资源
- forkJoin
- 官方文档
源码: https://github.com/ReactiveX/rxjs/blob/master/src/observable/ForkJoinObservable.ts
