"Developing with asynchronous data streams"
Observable: "Event Producer"
Observer: "Event Observer"
Cold: "Emits the same events for each new subscriber"
Hot: "Emits events even if there are no subscribers"
Interactive diagrams of Rx Observables
www.rxmarbles.comimport { filter, map } from 'rxjs/operators';
const obs = of(1, 2, 3, 4, 5)
// pipe different operations
.pipe(
filter(n => n % 2 !== 0),
map(n => n * n)
);
// Subscribe to get values
obs.subscribe(
x => console.log(x),
err => console.error("An error occurred", err),
_ => console.log("Observable completed")
);
Notification Type | Description |
---|---|
next | Required. A handler for each delivered value. Called zero or more times after execution starts. |
error | Optional. A handler for an error notification. An error halts execution of the observable instance. |
complete | Optional. A handler for the execution-complete notification. Delayed values can continue to be delivered to the next handler after execution is complete. |
Typical usecase: HTTP request returns value which is needed for following HTTP request
Example: Router ID includes an ID which we need for an HTTP method call parameter
The switchMap operator also cancels previous in-flight requests
ngOnInit() {
this.hero$ = this.route.paramMap.pipe(
switchMap((params: ParamMap) =>
this.service.getHero(params.get('id')))
);
}
import { Injectable } from '@angular/core';
import { Observable, BehaviorSubject } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class UserService {
get user$(): Observable {
return this.usersSubject.asObservable();
}
private users = [];
private usersSubject: BehaviorSubject = new BehaviorSubject(this.users);
constructor() {}
addUser(userName: string): void {
this.usersSubject.next([...this.users, userName]);
}
}
Observables are not executed until a consumer subscribes
The subscribe() executes the defined behavior once, and it can be called again.
// declare a publishing operation
new Observable((observer) => { subscriber_fn });
// initiate execution
observable.subscribe(() => {
// observer handles notifications
});
Promises execute immediately, and just once
The computation of the result is initiated when the promise is created
There is no way to restart work
// initiate execution
new Promise((resolve, reject) => { executer_fn });
// handle return value
promise.then((value) => {
// handle result here
});
Observables differentiate between transformation function such as a map and subscription
Only subscription activates the subscriber function to start computing the values
// nothing happens until subscribe() is called
observable.map((v) => 2*v);
Promises do not differentiate between the last .then clauses (equivalent to subscription) and intermediate .then clauses (equivalent to map).
promise.then((v) => 2*v);
Observable subscriptions are cancellable
Unsubscribing removes the listener from receiving further values, and notifies the subscriber function to cancel work.
const sub: Subscription = obs.subscribe(...);
sub.unsubscribe();
Promises are not cancellable
Observable execution errors are delivered to the subscriber's error handler, and the subscriber automatically unsubscribes from the observable.
obs.subscribe(() => {
throw Error('my error');
});
import { ajax } from 'rxjs/ajax';
import { map, catchError, retry } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
retry(3), // Retry up to 3 times before failing
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});
Promises push errors to the child promises.
promise.then(() => {
throw Error('my error');
});
// Setup
const clicks$ = fromEvent(buttonEl, ‘click’);
// Begin listening
const subscription = clicks$.subscribe(e => console.log(‘Clicked’, e));
// Stop listening
subscription.unsubscribe();
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => e.target.value),
filter(text => text.length > 2),
debounceTime(10),
distinctUntilChanged(),
switchMap(() => ajax('/api/endpoint'))
);
typeahead.subscribe(data => {
// Handle the data from the API
});
Returns observables from HTTP method calls
Due to TypeScript it is often clear if a variable is an observable
Often used to indicate observables: Trailing "$"
stopwatchValue: number;
stopwatchValue$: Observable;
Router and HTTP need no manual unsubscribe
For other observables we can use:
export class UnsubscribeComponent implements OnInit, OnDestroy {
subscription: Subscription | undefined;
constructor(private upperCaseService: UpperCaseService) {}
ngOnInit() {
this.subscription = this.upperCaseService.getUpperCaseMessage()
.subscribe((message: string) => this.message = message);
}
ngOnDestroy(): void {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
}
export class TakeUntilComponent implements OnInit, OnDestroy {
message: string;
private unsubscribe$ = new Subject();
constructor(private upperCaseService: UpperCaseService) {}
ngOnInit() {
this.upperCaseService.getUpperCaseMessage()
.takeUntil(this.unsubscribe$)
.subscribe((message: string) => this.message = message);
}
ngOnDestroy(): void {
this.unsubscribe$.next();
this.unsubscribe$.complete();
}
}
Subscribes to an observable or promise and returns the latest value it has emitted
Angular automatically unsubscribes if component is destroyed
@Component({
selector: 'async-observable-pipe',
template: `Time: {{ time | async }}`
})
export class AsyncObservablePipeComponent {
time = new Observable(observer =>
setInterval(() => observer.next(new Date().toString()), 1000)
);
}