Skip to content

Commit af3af82

Browse files
committed
feat(rx): add flattening operators with default error handling strategies
1 parent 9a7213a commit af3af82

File tree

10 files changed

+146
-0
lines changed

10 files changed

+146
-0
lines changed

libs/rx/operators/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,33 @@ A set of powerful RxJS operators for building reactive Angular applications.
1010

1111
## Operators
1212

13+
### Flattening operators
14+
15+
#### `rxSwitchMap`
16+
17+
`switchMap`-operator from `RxJs` with a default error handling strategy.
18+
19+
Strategies:
20+
tbd
21+
22+
#### `rxMergeMap`
23+
`mergeMap`-operator from `RxJs` with a default error handling strategy.
24+
25+
Strategies:
26+
tbd
27+
28+
#### `rxConcatMap`
29+
`concatMap`-operator from `RxJs` with a default error handling strategy.
30+
31+
Strategies:
32+
tbd
33+
#### `rxExhaustMap`
34+
`exhaustMap`-operator from `RxJs` with a default error handling strategy.
35+
36+
Strategies:
37+
tbd
38+
39+
1340
### Filter operators
1441

1542
- `rxFilterNull`: Filters out nullish values

libs/rx/operators/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,9 @@ export {rxWrap} from './lib/rx-wrap.operator';
66

77
export * from './lib/creational/rx-source';
88
export * from './lib/rx-pluck';
9+
10+
11+
export * from './lib/flattening/rx-switchmap';
12+
export * from './lib/flattening/rx-concatmap';
13+
export * from './lib/flattening/rx-mergemap';
14+
export * from './lib/flattening/rx-exhaustmap';

libs/rx/operators/src/lib/flattening/rx-concatmap.spec.ts

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import {catchError, concatMap, Observable, of, OperatorFunction} from "rxjs";
2+
3+
/**
4+
* RxJs concatMap operator with error handling
5+
*/
6+
export function rxConcatmap<T, R>(project: (value: T, index: number) => Observable<R>): OperatorFunction<T, R> {
7+
return (source: Observable<T>) => source.pipe(
8+
concatMap((value, index) => project(value, index).pipe(
9+
catchError(error => of(error))
10+
)),
11+
);
12+
13+
}
14+

libs/rx/operators/src/lib/flattening/rx-exhaustmap.spec.ts

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import {catchError, exhaustMap, Observable, of, OperatorFunction} from "rxjs";
2+
3+
/**
4+
* RxJs exhaustMap operator with error handling
5+
*/
6+
export function rxExhaustMap<T, R>(project: (value: T, index: number) => Observable<R>): OperatorFunction<T, R> {
7+
return (source: Observable<T>) => source.pipe(
8+
exhaustMap((value, index) => project(value, index).pipe(
9+
catchError(error => of(error))
10+
)),
11+
);
12+
13+
}
14+

libs/rx/operators/src/lib/flattening/rx-mergemap.spec.ts

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import {catchError, mergeMap, Observable, of, OperatorFunction} from "rxjs";
2+
3+
/**
4+
* RxJs mergeMap operator with error handling
5+
*/
6+
export function rxMergeMap<T, R>(project: (value: T, index: number) => Observable<R>): OperatorFunction<T, R> {
7+
return (source: Observable<T>) => source.pipe(
8+
mergeMap((value, index) => project(value, index).pipe(
9+
catchError(error => of(error))
10+
)),
11+
);
12+
13+
}
14+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// create marble tests for rxSwitchMap operator
2+
3+
4+
import {rxSwitchMap} from "./rx-switchmap";
5+
import {TestScheduler} from "rxjs/internal/testing/TestScheduler";
6+
import {observableMatcher} from "@angular-kit/test-helpers";
7+
import {map} from "rxjs";
8+
9+
describe('rxSwitchMap', () => {
10+
11+
let testScheduler: TestScheduler;
12+
13+
beforeEach(() => {
14+
testScheduler = new TestScheduler(observableMatcher);
15+
});
16+
17+
it('should map-and-flatten each item to an Observable', () => {
18+
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
19+
const e1 = hot(' --1-----3--5-------|');
20+
const e1subs = ' ^------------------!';
21+
const e2 = cold(' x-x-x| ', { x: 10 });
22+
// x-x-x|
23+
// x-x-x|
24+
const expected = ' --x-x-x-y-yz-z-z---|';
25+
const values = { x: 10, y: 30, z: 50 };
26+
27+
const result = e1.pipe(rxSwitchMap((x) => e2.pipe(map((i) => i * +x))));
28+
29+
expectObservable(result).toBe(expected, values);
30+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
31+
});
32+
});
33+
34+
// todo
35+
/*describe('default error handling', () => {
36+
it('should catch and return the raised error', () => {
37+
testScheduler.run(({ hot, cold, expectObservable }) => {
38+
const e1 = hot(' --1-1--|');
39+
const e2 = cold(' y-x--| ', { x: 10, y: 1 });
40+
// x-x-x|
41+
// x-x-x|
42+
const expected = ' --e-x--|';
43+
const values = { x: 10, e: 'error' };
44+
45+
const result = e1.pipe(rxSwitchMap((x) => e2.pipe(map((i) => {
46+
if(i === 2) {
47+
throw ('error');
48+
}
49+
return i * +x
50+
}))));
51+
52+
expectObservable(result).toBe(expected, values);
53+
});
54+
});
55+
});*/
56+
57+
})
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import {catchError, Observable, of, OperatorFunction, switchMap} from "rxjs";
2+
3+
/**
4+
* RxJs switchMap operator with error handling
5+
*/
6+
export function rxSwitchMap<T, R>(project: (value: T, index: number) => Observable<R>): OperatorFunction<T, R> {
7+
return (source: Observable<T>) => source.pipe(
8+
switchMap((value, index) => project(value, index).pipe(
9+
catchError(error => of(error))
10+
)),
11+
);
12+
13+
}
14+

0 commit comments

Comments
 (0)