↑
RxJS
What is RxJS?
- RxJS is a JavaScript library for working with data that changes over time.
Example: clicks, keyboard events, HTTP responses, timers.
- It uses the idea of
Observable streams.
An Observable is like a pipe that keeps sending values.
- You can listen to these values using
subscribe().
When new data comes, your code runs automatically.
Why use RxJS?
- It helps you handle async tasks in a clean way.
Example: chain HTTP requests without “callback hell”.
- It can combine many events into one stream.
Example: filter user typing + wait 300ms + send search request.
- It is used heavily in Angular, so learning it is very helpful.
Key Concepts
- Observable: A stream of values over time
(like a river sending values one by one).
- Observer: An object that watches the stream.
It reacts when the stream sends data.
- Subscription: A connection between you and the Observable.
You can stop it by calling
unsubscribe().
- Operators: Functions that change the stream.
Example:
map(), filter(), debounceTime().
Basic Example
- Create an
Observable that sends 1, 2, 3:
import { of } from 'rxjs';
const numbers$ = of(1, 2, 3);
numbers$.subscribe(value => {
console.log(value);
});
of() creates a stream with fixed values.
subscribe() prints each value when it arrives.
Example with Operators
- Use
map() to change each value before it reaches you:
import { of, map } from 'rxjs';
const numbers$ = of(1, 2, 3)
.pipe(
map(n => n * 10)
);
numbers$.subscribe(result => {
console.log(result); // 10, 20, 30
});
pipe() lets you add operators step by step.
- Operators do not change the original stream,
they create a new stream with new values.
Does of() emit immediately?
of(1, 2, 3) creates a stream that sends 1, then 2, then 3 as soon as you subscribe.
- It is a "cold"
Observable: each subscriber gets the same values from the start.
- So yes — your code will print three values right after
subscribe() runs.
import { of } from 'rxjs';
const numbers$ = of(1, 2, 3);
numbers$.subscribe(v => console.log(v)); // 1, 2, 3 (sync, one after another)
How do I add values later and trigger subscribe()?
- Normal Observables (like from
of()) are "read-only" from the outside.
You cannot push new values into them after creation.
- To push values yourself, use a
Subject (or BehaviorSubject, ReplaySubject).
They have a next(value) method that emits to all subscribers.
- Or build your own Observable with
new Observable() and call observer.next(...) inside.
Add values with Subject (most common)
Subject starts empty. You push values using next().
import { Subject } from 'rxjs';
const clicks$ = new Subject<number>();
// Listen
clicks$.subscribe(v => console.log('A:', v));
clicks$.subscribe(v => console.log('B:', v));
// Emit (later, anywhere you hold the subject)
clicks$.next(1); // A: 1, B: 1
clicks$.next(2); // A: 2, B: 2
- Use it for event streams: buttons, sockets, custom signals.
Keep latest value with BehaviorSubject
BehaviorSubject needs an initial value and always remembers the latest value.
import { BehaviorSubject } from 'rxjs';
const count$ = new BehaviorSubject<number>(0); // initial value 0
count$.subscribe(v => console.log('First:', v)); // First: 0
count$.next(1);
count$.next(2);
count$.subscribe(v => console.log('Second (late):', v)); // Second (late): 2 (gets latest immediately)
- Great for app state: user, theme, cart, form values.
Build your own Observable (custom producer)
- Use
new Observable() when you want control over how values are produced.
import { Observable } from 'rxjs';
const ticker$ = new Observable<number>((observer) => {
let n = 0;
const id = setInterval(() => {
observer.next(n++); // push next number every 500ms
if (n === 5) observer.complete(); // finish after 5 values
}, 500);
// cleanup on unsubscribe
return () => clearInterval(id);
});
const sub = ticker$.subscribe({
next: v => console.log('tick', v),
complete: () => console.log('done'),
});
// Later, if needed: sub.unsubscribe();
- Good when wrapping
setInterval, DOM events, or 3rd-party APIs.
Cold vs Hot (quick idea)
- Cold (e.g.,
of(), from(), HTTP):
each subscriber gets the same sequence from the start.
- Hot (e.g.,
Subject, DOM events):
values happen regardless of subscribers,
late subscribers may miss older values.
BehaviorSubject is hot, but it replays the latest value to new subscribers.
When emissions are not immediate
- Sources like
interval(), timer(), HTTP calls, or user events emit later (async), not immediately.
import { interval, take } from 'rxjs';
interval(1000)
.pipe(take(3))
.subscribe(v => console.log(v)); // prints 0, 1, 2 — one per second
- So: some streams are sync (
of()), some are async (timers, HTTP, UI events).
Why does Angular HTTP use cold Observables?
- Angular’s
HttpClient.get(...) returns a cold Observable.
This means: the request is sent only when you subscribe.
- Each subscription triggers a new HTTP call.
This is good for one-off requests (like loading a page) and for retry logic.
- Why cold?
So your code is predictable: you control when the request happens and you can easily cancel it by unsubscribing.
import { HttpClient } from '@angular/common/http';import { shareReplay } from 'rxjs';
class UserService {
constructor(private http: HttpClient) {}
// Each subscribe triggers a NEW request (cold)
loadUsers() {
return this.http.get('/api/users');
}
// If you want to SHARE one request with many subscribers: (First subscriber sends request, later subscribers reuse the result.)
loadUsersShared() {
return this.http.get('/api/users')
.pipe(
shareReplay(1) // cache the last value for new subscribers
);
}
}
- Tip: If you see duplicate HTTP calls, you likely subscribed more than once.
Use
shareReplay(1) if you want to share one result.
Why do forms often use BehaviorSubject?
- Forms need to keep the current state: values, errors, dirty/touched flags.
New listeners (like a new component) must get the latest state right away.
BehaviorSubject is perfect here: it always has a current value and gives it to new subscribers immediately.
- Angular’s
FormControl.valueChanges is an Observable of values over time.
If you keep a mirror of form state yourself, a BehaviorSubject makes it simple.
import { BehaviorSubject } from 'rxjs';
type Profile = { name: string; email: string };
class ProfileStore {
// start with an initial value
private readonly _state = new BehaviorSubject<Profile>({ name: '', email: '' });
// expose as Observable for read-only access
state$ = this._state.asObservable();
// update from anywhere (e.g., form handlers)
update(patch: Partial<Profile>) {
const current = this._state.value; // latest value
this._state.next({ ...current, ...patch });
}
}
// somewhere in your component:
// store.state$.subscribe(profile => render(profile));
// form.valueChanges.subscribe(val => store.update(val));
- Result: Any subscriber (new or old) immediately sees the current form state and any future changes.