RxJS in React

Created by Jan Doležel

What is it?

  • Library for reactive programming using Observables
  • The Observer pattern done right

What?

  • Observable is a lazy Push collections of multiple values
  • An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers)

What II?

Producer Consumer
Pull Passive
produces data when requested
Active
decides when data is requested
Push Active
produces data at its own pace
Passive
reacts to received data

Single Multiple
Pull Function Iterator
Push Promise Observable

Example


            import * as rxjs from 'rxjs'

            const foo = new rxjs.Observable(subscriber => {
              console.log('Hello')
              subscriber.next(42)
            })

            foo.subscribe(x => {
              console.log(x)
            })
            foo.subscribe(y => {
              console.log(y)
            })
          

Operators

map

                import * as rxjs from 'rxjs'

                const bar = new rxjs.Observable(subscriber => {
                  subscriber.next(1)
                  subscriber.next(2)
                  subscriber.next(3)
                })

                bar.pipe(
                  rxjs.operators.map((x) => x * 10),
                ).subscribe(x => {
                  console.log(x)
                })
              
concat

                import * as rxjs from 'rxjs'

                const bar1 = new rxjs.Observable(subscriber => {
                  subscriber.next(1)
                  subscriber.next(2)
                  subscriber.complete() // try to remove it
                })

                const bar2 = new rxjs.Observable(subscriber => {
                  subscriber.next(11)
                  subscriber.next(12)
                  subscriber.complete()
                })

                rxjs.concat(bar1, bar2).subscribe(x => {
                  console.log(x)
                })
              
take

              import * as rxjs from 'rxjs'

              rxjs.interval(1000).pipe(
                rxjs.operators.take(4),
              ).subscribe(x => {
                console.log(x)
              })
            
mergeMap

                import * as rxjs from 'rxjs'

                rxjs.interval(500).pipe(
                  rxjs.operators.take(4),
                  rxjs.operators.map((x) => x * 10),
                  rxjs.operators.mergeMap((x) =>
                    rxjs.interval(200).pipe(
                      rxjs.operators.take(5),
                      rxjs.operators.map((y) => x + y),
                    )
                  ),
                ).subscribe(x => {
                  console.log(x)
                })
              
switchMap

                import * as rxjs from 'rxjs'

                rxjs.interval(500).pipe(
                  rxjs.operators.take(4),
                  rxjs.operators.map((x) => x * 10),
                  rxjs.operators.switchMap((x) =>
                    rxjs.interval(200).pipe(
                      rxjs.operators.take(5),
                      rxjs.operators.map((y) => x + y),
                    )
                  ),
                ).subscribe(x => {
                  console.log(x)
                })
              
pipe of from filter map tap switchMap mergeMap mergeMapTo flatMap concat concatAll merge combineAll combineLatest catchError ...

WTF?

Interactive diagrams of Rx Observables

Operator Decision Tree

Learn RxJS

In React

Redux Middleware comparison

from Redux Middleware Comparison

  • Redux Thunk - just return promise, write everything on your own
  • Redux Saga - useful helper functions, generator functions ~ iterator like ~ imperative programming
  • Redux Loop - mixing reducers with async behaviour ¯\_(ツ)_/¯, write everything on your own
  • Redux Observable - wrapper around RxJS - functional, huuuge set of helper functions

Some useful libraries first

typesafe-actions


              import { createStandardAction, createAsyncAction } from 'typesafe-actions'
              const myAction = createStandardAction('myType')<TPayload>()
              const { request, success, failure } = createAsyncAction(
                'myRequestType',
                'mySuccessType',
                'myFailureType',
              )<TRequestPayload, TSuccessPayload, TFailurePayload>()
            

getType, isActionOf - typeguards

io-ts


              import * as t from 'io-ts'
              import { PathReporter } from 'io-ts/lib/PathReporter'
              import { fold } from 'fp-ts/lib/Either'

              export const Margins = t.type({
                top: t.number,
                bottom: t.number,
                left: t.number,
                right: t.number,
              })
              export type Margins = t.TypeOf<typeof Margins>

              export const createValidator = <T>(type: t.Type<T>) => (data: any): T => {
                const validation = type.decode(data)
                return fold(
                  () => {
                    throw new Error(PathReporter.report(validation).join('. '))
                  },
                  (value: T) => value,
                )(validation)
              }
            

Example


              import { createStore, compose, Store, applyMiddleware } from 'redux'
              import { createEpicMiddleware } from 'redux-observable'
              import { Action } from './actions'
              import { IState, masterReducer } from './reducers'
              import { epic } from './epics'

              const epicMiddleware = createEpicMiddleware<Action, Action, IState>()
              const configureStore = (initialState?: IState) => createStore(
                masterReducer,
                initialState,
                compose(
                  applyMiddleware(
                    epicMiddleware,
                  ),
                ),
              )
              export const store = configureStore()
              epicMiddleware.run(epic)
            

Example (actions.ts)


              import { ActionType, createStandardAction } from 'typesafe-actions'

              const SubAction = {
                addItem: createStandardAction('addItem')<string>()
              }

              // split by module, functionality, whatever...
              export const Action = {
                SubAction,
              }
              export type Action = ActionType<typeof Action>
            

Example (reducers.ts)


              import { Reducer, combineReducers } from 'redux'
              import { Action } from './actions'

              export interface IStateSlice {
                list: string[]
              }

              const initialState: IStateSlice = { list: [] }

              const addItem = (state: IStateSlice, item: string): IStateSlice => ({
                ...state,
                list: [...state.list, item],
              })

              const sliceReducer: Reducer<IStateSlice, Action> = (state = initialState, action) => {
                switch (action.type) {
                  case getType(Action.SubAction.addItem): return addItem(state, action.payload)
                  default: return state
                }
              }

              // split by module, functionality, whatever...
              export interface IState {
                stateSlice: IStateSlice
              }

              const masterReducer = combineReducers<IState, Action>({
                stateSlice: sliceReducer,
              })
            

Example (epics.ts)


              import { Epic, combineEpics } from 'redux-observable'
              import { isActionOf } from 'typesafe-actions'
              import { filter, tap, ignoreElements } from 'rxjs/operators'
              import { Action } from './actions'
              import { IState } from './reducers'

              const someEpic: Epic<Action, Action, IState> = ($action, store$) =>
                $action.pipe(
                  filter(isActionOf(Action.SubAction.addItem)),
                  tap((action) => console.log(action.payload)),
                  ignoreElements(),
                )

              export const epic = combineEpics(
                someEpic,
              )
            

Let's go async


              import { from, of } from 'rxjs'
              import { filter, switchMap, catchError } from 'rxjs/operators'

              const DataType = t.type({ payload: t.array(t.string) })
              type DataType = t.TypeOf<typeof DataType>
              const dataValidator = validate(DataType)

              const loadData = createAsyncAction(
                'loadData',
                'loadDataSuccess',
                'loadDataFailure'
              )<string, DataType, Error>()

              const someEpic: ($action) =>
                $action.pipe(
                  filter(isActionOf(loadData.request)),
                  switchMap(({ payload }) =>
                    from(fetch(`/api/${payload}`)).pipe(
                      mergeMap((response) => {
                        if (!response.ok) {
                          throw new Error('fetching error')
                        }
                        return response.json()
                      })
                      map(dataValidator),
                      map(loadData.success),
                      catchError((error: Error) => of(loadData.failure(error))),
                    ),
                  ),
                )
            

Let's see some more epic examples

Fire more actions on success

            import { from, of } from 'rxjs'
            import { filter, mergeMap, switchMap, map, catchError } from 'rxjs/operators'

            const someEpic: ($action) =>
              $action.pipe(
                filter(isActionOf(loadData.request)),
                switchMap(({ payload }) =>
                  from(fetch(`/api/${payload}`)).pipe(
                    map(dataValidator),
                    mergeMap((data) => [
                      loadData.success(data),
                      notification.add('Data loaded'),
                    ]),
                    catchError((error: Error) => of(loadData.failure(error))),
                  ),
                ),
              )
          
Canceling/Switching previous observables based on data in payload

            import { from, of } from 'rxjs'
            import { filter, groupBy, mergeMap, switchMap, map, catchError } from 'rxjs/operators'

            const getListing: Epic<Action, Action, IState> = ($action) =>
              $action.pipe(
                filter(isActionOf(Actions.Library.Resources.getListing.request)),
              
switchMap(({ payload: { resource } }) => from(fetch(`/api/${resource}`)).pipe( map(validateListing), map(Actions.Library.Resources.getListing.success), catchError((error: Error) => of(Actions.Library.Resources.getListing.failure(error.message))), ), ), ) groupBy(({ payload }) => payload.resource), mergeMap((group) => group.pipe( switchMap(({ payload: { resource } }) => from(fetch(`/api/${resource}`)).pipe( map(validateListing), map(Actions.Library.Resources.getListing.success), catchError((error: Error) => of(Actions.Library.Resources.getListing.failure(error.message))), ), ), )), )
Listen to action only after other acton occured

            import { filter, switchMap, mergeMap } from 'rxjs/operators'
            import { makeGetFontConfig } from './selectors'

            const getFontConfig = makeGetFontConfig()
            const loadFont: Epic<Action, Action, IState> = ($action, state$) =>
              $action.pipe(
                filter(isActionOf(Actions.Fonts.loadConfig.success))
                mergeMap(() =>
                  $action.pipe(
                    filter(isActionOf(Actions.Fonts.loadFont.request)),
                    switchMap(([{ payload }]) => {
                      const config = getFontConfig(store$.value, payload)
                      ...
                    }),
                  ),
                ),
              )
          
            import { filter, switchMap, withLatestFrom } from 'rxjs/operators'
            import { makeGetFontConfig } from './selectors'

            const loadFont: Epic = ($action, store$) =>
              $action.pipe(
                filter(isActionOf(Actions.Fonts.loadFont.request)),
                withLatestFrom($action.pipe(filter(isActionOf(Actions.Fonts.loadConfig.success)))),
                switchMap(([{ payload }]) =>{
                  const config = getFontConfig(store$.value, payload)
                  ...
                }),
              )
          
            import { combineLatest } from 'rxjs'
            import { filter, switchMap } from 'rxjs/operators'
            import { makeGetFontConfig } from './selectors'

            const getFontConfig = makeGetFontConfig()
            const loadFont: Epic<Action, Action, IState> = ($action, store$) =>
              combineLatest(
                $action.pipe(filter(isActionOf(Actions.Fonts.loadFont.request))),
                $action.pipe(filter(isActionOf(Actions.Fonts.loadConfig.success))),
              ).pipe(
                switchMap(([{ payload }]) => {
                  const config = getFontConfig(store$.value, payload)
                  ...
                }),
              )
          
Do something between two actions

            import { interval } from 'rxjs'
            import { filter, switchMap, map, takeUntil } from 'rxjs/operators'

            const periodicToken: Epic<Action, Action, IState> = ($action) =>
              $action.pipe(
                filter(isActionOf(Actions.User.login.success)),
                switchMap(({ payload }) =>
                  interval(Math.max(payload.jwt.expires_in * 1000 - 5000, 5000)).pipe(
                    map(() => Actions.User.refreshToken()),
                  ),
                ),
                takeUntil($action.pipe(filter(isActionOf(Actions.User.logout)))),
              )
          
Fire request and wait for success

            import { empty, merge } from 'rxjs'
            import { filter, switchMap, map, first } from 'rxjs/operators'
            import { getLogedInUser } from './selectors'

            const changeMyPassword: Epic<Action, Action, IState> = ($action, store$) =>
              $action.pipe(
                filter(isActionOf(Actions.User.changePassword)),
                switchMap(({ payload }) => {
                  const logedInUser = getLogedInUser(store$.value)
                  return merge(
                    of(Actions.Users.changeUserPassword.request({
                      ...payload,
                      username: logedInUser.username,
                    })),
                    $action.pipe(
                      filter(isActionOf(Actions.Users.changeUserPassword.success)),
                      filter(({ payload: username }) => username === logedInUser.username),
                      first(),
                      map(() => Actions.User.passwordChanged()),
                    ),
                  )
                }),
              )
          
Combining with Router 5 and splitting Observables

              import { Epic, ActionsObservable } from 'redux-observable'
              import { merge, of } from 'rxjs'
              import { filter, mergeMap, map, takeUntil } from 'rxjs/operators'
              import { actionTypes, actions } from 'redux-router5'

              import { IState } from 'root/reducer'
              import { Action as AppAction } from 'root/actions'
              import router from 'routing'

              type TransitionSuccess = ReturnType<typeof actions.transitionSuccess>
              type NavigateTo = ReturnType<typeof actions.navigateTo>

              type Router5Action = TransitionSuccess | NavigateTo
              export type Action = AppAction | Router5Action

              const isTransitionSuccess = (action: Action): action is TransitionSuccess =>
                action.type === actionTypes.TRANSITION_SUCCESS

              const isRouteEnter = (routeName: string) =>
                ({ payload: { previousRoute, route } }: TransitionSuccess): boolean =>
                  (previousRoute === null || !previousRoute.name.startsWith(routeName)) && route.name.startsWith(routeName)

              const isRouteLeave = (routeName: string) =>
                ({ payload: { previousRoute, route } }: TransitionSuccess): boolean =>
                  (previousRoute === null || previousRoute.name.startsWith(routeName)) && !route.name.startsWith(routeName)

              const $routeEnter = ($action: ActionsObservable<Action>, routeName: string) =>
                $action.pipe(
                  filter(isTransitionSuccess),
                  filter(isRouteEnter(routeName)),
                )

              const $routeLeave = ($action: ActionsObservable<Action>, routeName: string) =>
                $action.pipe(
                  filter(isTransitionSuccess),
                  filter(isRouteLeave(routeName)),
                )

              const $enteringRoute = ($action: ActionsObservable<Action>, routeName: string) => {
                const fromAction = $routeEnter($action, routeName)
                const alreadyActive = router.getState().name.startsWith(routeName)
                return alreadyActive ? merge(of(null), fromAction) : fromAction
              }

              export const inRoute = (routeName: string, epic: Epic<Action, Action, IState>): Epic<Action, Action, IState> =>
                ($action, state$, dependencies) => $enteringRoute($action, routeName).pipe(
                  mergeMap(() => epic($action, state$, dependencies)),
                  takeUntil($routeLeave($action, routeName)),
                )

              export const onRouteEnter = (routeName: string, action: Action | Action[]): Epic<Action, Action, IState> =>
                ($action) => $enteringRoute($action, routeName).pipe(
                  Array.isArray(action)
                    ? mergeMap(() => action)
                    : map(() => action),
                )

              export const onRouteLeave = (routeName: string, action: Action | Action[]): Epic<Action, Action, IState> =>
                ($action) => $routeLeave($action, routeName).pipe(
                  Array.isArray(action)
                    ? mergeMap(() => action)
                    : map(() => action),
                )
            
Usage

              import { onRouteEnter } from './routerEpics'

              const onEnter = onRouteEnter(
                RouteName.Designer,
                Actions.Designer.App.Element.resetTemplate(),
              )
              const onEnterAfterLogin: Epic<Action, Action, IState> = ($action, state$, dependencies) =>
                $action.pipe(
                  filter(isActionOf(Actions.User.login.success)),
                  mergeMap(() =>
                    onRouteEnter(RouteName.Workflow, [
                      Actions.Workflow.Components.loadComponents.request(),
                      Actions.Workflow.Printers.loadPrinters.request(),
                      Actions.Workflow.Workflows.loadWorkflows.request(),
                      Actions.Workflow.IOTypes.loadIOTypes.request(),
                      Actions.Workflow.MimeTypes.loadMimeTypes.request(),
                    ])($action, state$, dependencies),
                  ),
                  takeUntil($action.pipe(filter(isActionOf(Actions.User.logout)))),
                )
            
Usage II

              import { merge, of, from, interval } from 'rxjs'
              import { filter, map, flatMap, catchError, throttleTime } from 'rxjs/operators'
              import { inRoute } from './routerEpics'

              const validationWorker = new Worker('validationWorker.js')

              const validate = inRoute(RouteName.Designer, ($action, store$) =>
                merge(
                  $action.pipe(filter((action) => validationActions.includes(action.type))),
                  interval(VALIDATION_INTERVAL),
                ).pipe(
                  throttleTime(VALIDATION_THROTTLE, undefined, { leading: true, trailing: true }),
                  flatMap(() => from(new Promise<IInvalidElement[]>((resolve, reject) => {
                    const onMessage = (e: MessageEvent) => {
                      const invalidElements = e.data
                      validationWorker.removeEventListener('message', onMessage)
                      resolve(invalidElements)
                    }
                    setTimeout(
                      () => {
                        validationWorker.removeEventListener('message', onMessage)
                        reject(new Error('Timeout'))
                      },
                      VALIDATION_TIMEOUT,
                    )
                    validationWorker.addEventListener('message', onMessage)

                    const template = getTemplate(store$.value)
                    validationWorker.postMessage(template)
                  }))),
                  map(Actions.Designer.App.Validation.validate.success),
                  catchError((error: Error) =>
                    of(Actions.Designer.App.Validation.validate.failure(error))),
                ),
              )
            
Complex example

            const login: Epic<Action, Action, IState> = ($action) =>
              $action.pipe(
                filter(isActionOf(Actions.User.login.request)),
                switchMap(({ payload: { username, password } }) =>
                  loginRequest(username, password).pipe(
                    mergeMap((jwt) => merge(
                      of(Actions.User.getPermissions.request()),
                      of(Actions.User.getUserRoles.request(username)),
                      from(zip(
                        $action.pipe(filter(isActionOf(Actions.User.getUserRoles.success))),
                        $action.pipe(filter(isActionOf(Actions.User.getPermissions.success))),
                        ({ payload: tenantsRoles }) => tenantsRoles,
                      )).pipe(
                        first(),
                        mergeMap((tenantsRoles) =>
                          tenantsRoles.length > 1
                            ? $action.pipe(
                                filter(isActionOf(Actions.User.selectTenant)),
                                switchMap(({ payload: newTenantId }) =>
                                  newTenantId === jwt.tenantId
                                    ? of(loginSuccess(jwt, username))
                                    : from(loginRequest(username, password, newTenantId)).pipe(
                                        map((newJwt) => loginSuccess(newJwt, username)),
                                      ),
                                ),
                              )
                            : of(loginSuccess(jwt, username)),
                        ),
                      ),
                    )),
                    catchError((error: Error) =>
                      of(Actions.User.login.failure(new Error(`cannot login: ${error.message}`)))),
                  ),
                ),
              )
          

React example


              import React from 'react'
              import { Observable, Subject, Subscription, of, merge, from  } from 'rxjs'
              import { debounceTime, distinctUntilChanged, switchMap, startWith, map, mergeMap, catchError } from 'rxjs/operators'

              interface ITextToImageProps  {
                text: string
              }

              interface ITextToImageState {
                text: string
                image?: string
                error?: string
              }

              const arrayBufferToBase64 = (buffer: ArrayBuffer) => {
                const bytes: number[] = [].slice.call(new Uint8Array(buffer))
                const binary = bytes.reduce((str, b) => `${str}${String.fromCharCode(b)}`, '')
                return window.btoa(binary)
              }

              class TextToImage extends React.Component<ITextToImageProps, ITextToImageState> {
                public state: ITextToImageState = {
                  text: this.props.text,
                }

                private text$ = new Subject<string>()
                private state$: Observable<Pick<ITextToImageState, 'image' | 'error'>> =
                  this.text$.pipe(
                    startWith(this.state.text),
                    debounceTime(500),
                    distinctUntilChanged(),
                    switchMap((text) => {
                      const reset = of({ image: undefined, error: undefined })
                      return text.length > 0
                        ? merge(
                            reset,
                            from(fetch(`/toImage/${text}`)).pipe(
                              mergeMap((response) => response.arrayBuffer()),
                              map((buffer) => ({
                                image: `data:image/jpeg;base64,${arrayBufferToBase64(buffer)}`,
                              })),
                              catchError((error) => of({ error: error.message })),
                            ),
                          )
                        : reset
                    }),
                  )
                private stateUpdates$?: Subscription

                public componentDidMount() {
                  this.stateUpdates$ = this.state$.subscribe(this.setState.bind(this))
                }

                public componentWillUnmount() {
                  if (this.stateUpdates$ !== undefined) {
                    this.stateUpdates$.unsubscribe()
                  }
                }

                public render() {
                  const { text, image, error } = this.state
                  return (
                    <>
                      <input value={text} onChange={this.setText} />
                      {(error !== undefined || image !== undefined)
                          ? (
                            <div>
                              {error === undefined
                                ? <img src={image} />
                                : error
                              }
                            </div>
                          )
                          : null
                        }
                    </>
                  )
                }

                private setText = (ev: React.ChangeEvent<HTMLInputElement>) => {
                  const text = ev.target.value
                  this.setState({ text })
                  this.text$.next(text)
                }
              }

              export default TextToImage
            

That's all folks

Happy coding! :)