RxJS


What is RxJS?

  1. RxJS is a JavaScript library for working with data that changes over time.

    Example: clicks, keyboard events, HTTP responses, timers.



  2. It uses the idea of Observable streams.

    An Observable is like a pipe that keeps sending values.



  3. You can listen to these values using subscribe(). When new data comes, your code runs automatically.



Why use RxJS?

  1. It helps you handle async tasks in a clean way. Example: chain HTTP requests without “callback hell”.


  2. It can combine many events into one stream. Example: filter user typing + wait 300ms + send search request.


  3. It is used heavily in Angular, so learning it is very helpful.



Key Concepts

  1. Observable: A stream of values over time (like a river sending values one by one).


  2. Observer: An object that watches the stream. It reacts when the stream sends data.


  3. Subscription: A connection between you and the Observable. You can stop it by calling unsubscribe().


  4. Operators: Functions that change the stream. Example: map(), filter(), debounceTime().



Basic Example

  1. Create an Observable that sends 1, 2, 3:

  2. import { of } from 'rxjs';
    
    const numbers$ = of(1, 2, 3);
    
    numbers$.subscribe(value => {
      console.log(value);
    });
    


  3. of() creates a stream with fixed values.


  4. subscribe() prints each value when it arrives.



Example with Operators

  1. Use map() to change each value before it reaches you:

  2. import { of, map } from 'rxjs';
    
    const numbers$ = of(1, 2, 3)
      .pipe(
        map(n => n * 10)
      );
    
    numbers$.subscribe(result => {
      console.log(result); // 10, 20, 30
    });
    


  3. pipe() lets you add operators step by step.


  4. Operators do not change the original stream, they create a new stream with new values.



Does of() emit immediately?

  1. of(1, 2, 3) creates a stream that sends 1, then 2, then 3 as soon as you subscribe.


  2. It is a "cold" Observable: each subscriber gets the same values from the start.


  3. So yes — your code will print three values right after subscribe() runs.

  4. import { of } from 'rxjs';
    
    const numbers$ = of(1, 2, 3);
    
    numbers$.subscribe(v => console.log(v)); // 1, 2, 3 (sync, one after another)
    



How do I add values later and trigger subscribe()?

  1. Normal Observables (like from of()) are "read-only" from the outside.

    You cannot push new values into them after creation.



  2. To push values yourself, use a Subject (or BehaviorSubject, ReplaySubject).

    They have a next(value) method that emits to all subscribers.



  3. Or build your own Observable with new Observable() and call observer.next(...) inside.



Add values with Subject (most common)

  1. Subject starts empty. You push values using next().

  2. import { Subject } from 'rxjs';
    
    const clicks$ = new Subject<number>();
    
    // Listen
    clicks$.subscribe(v => console.log('A:', v));
    clicks$.subscribe(v => console.log('B:', v));
    
    // Emit (later, anywhere you hold the subject)
    clicks$.next(1);  // A: 1, B: 1
    clicks$.next(2);  // A: 2, B: 2
    


  3. Use it for event streams: buttons, sockets, custom signals.



Keep latest value with BehaviorSubject

  1. BehaviorSubject needs an initial value and always remembers the latest value.

  2. 
    import { BehaviorSubject } from 'rxjs';
    
    const count$ = new BehaviorSubject<number>(0); // initial value 0
    
    count$.subscribe(v => console.log('First:', v)); // First: 0
    
    count$.next(1);
    count$.next(2);
    
    count$.subscribe(v => console.log('Second (late):', v)); // Second (late): 2 (gets latest immediately)
    


  3. Great for app state: user, theme, cart, form values.



Build your own Observable (custom producer)

  1. Use new Observable() when you want control over how values are produced.

  2. import { Observable } from 'rxjs';
    
    const ticker$ = new Observable<number>((observer) => {
        let n = 0;
        const id = setInterval(() => {
            observer.next(n++);                 // push next number every 500ms
            if (n === 5) observer.complete();   // finish after 5 values
        }, 500);
    
        // cleanup on unsubscribe
        return () => clearInterval(id);
    });
    
    const sub = ticker$.subscribe({
        next: v => console.log('tick', v),
        complete: () => console.log('done'),
    });
    
    // Later, if needed: sub.unsubscribe();
    


  3. Good when wrapping setInterval, DOM events, or 3rd-party APIs.



Cold vs Hot (quick idea)

  1. Cold (e.g., of(), from(), HTTP): each subscriber gets the same sequence from the start.


  2. Hot (e.g., Subject, DOM events): values happen regardless of subscribers, late subscribers may miss older values.


  3. BehaviorSubject is hot, but it replays the latest value to new subscribers.



When emissions are not immediate

  1. Sources like interval(), timer(), HTTP calls, or user events emit later (async), not immediately.

  2. import { interval, take } from 'rxjs';
    
    interval(1000)
        .pipe(take(3))
        .subscribe(v => console.log(v)); // prints 0, 1, 2 — one per second
    


  3. So: some streams are sync (of()), some are async (timers, HTTP, UI events).



Why does Angular HTTP use cold Observables?

  1. Angular’s HttpClient.get(...) returns a cold Observable. This means: the request is sent only when you subscribe.


  2. Each subscription triggers a new HTTP call. This is good for one-off requests (like loading a page) and for retry logic.


  3. Why cold? So your code is predictable: you control when the request happens and you can easily cancel it by unsubscribing.

  4. import { HttpClient } from '@angular/common/http';import { shareReplay } from 'rxjs';
    
    class UserService {
        constructor(private http: HttpClient) {}
    
        // Each subscribe triggers a NEW request (cold)
        loadUsers() {
            return this.http.get('/api/users');
        }
    
        // If you want to SHARE one request with many subscribers: (First subscriber sends request, later subscribers reuse the result.)
        loadUsersShared() {
            return this.http.get('/api/users')
                .pipe(
                    shareReplay(1) // cache the last value for new subscribers
                );
        }
    }
    


  5. Tip: If you see duplicate HTTP calls, you likely subscribed more than once. Use shareReplay(1) if you want to share one result.



Why do forms often use BehaviorSubject?

  1. Forms need to keep the current state: values, errors, dirty/touched flags. New listeners (like a new component) must get the latest state right away.


  2. BehaviorSubject is perfect here: it always has a current value and gives it to new subscribers immediately.


  3. Angular’s FormControl.valueChanges is an Observable of values over time. If you keep a mirror of form state yourself, a BehaviorSubject makes it simple.

  4. import { BehaviorSubject } from 'rxjs';
    
    type Profile = { name: string; email: string };
    
    class ProfileStore {
      // start with an initial value
      private readonly _state = new BehaviorSubject<Profile>({ name: '', email: '' });
    
      // expose as Observable for read-only access
      state$ = this._state.asObservable();
    
      // update from anywhere (e.g., form handlers)
      update(patch: Partial<Profile>) {
        const current = this._state.value;     // latest value
        this._state.next({ ...current, ...patch });
      }
    }
    
    // somewhere in your component:
    // store.state$.subscribe(profile => render(profile));
    // form.valueChanges.subscribe(val => store.update(val));
    


  5. Result: Any subscriber (new or old) immediately sees the current form state and any future changes.