Created by Jan Doležel
| Producer | Consumer | |
|---|---|---|
| Pull |
Passive produces data when requested |
Active decides when data is requested |
| Push |
Active produces data at its own pace |
Passive reacts to received data |
| Single | Multiple | |
|---|---|---|
| Pull | Function | Iterator |
| Push | Promise | Observable |
import * as rxjs from 'rxjs'
const foo = new rxjs.Observable(subscriber => {
console.log('Hello')
subscriber.next(42)
})
foo.subscribe(x => {
console.log(x)
})
foo.subscribe(y => {
console.log(y)
})
import * as rxjs from 'rxjs'
const bar = new rxjs.Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
})
bar.pipe(
rxjs.operators.map((x) => x * 10),
).subscribe(x => {
console.log(x)
})
import * as rxjs from 'rxjs'
const bar1 = new rxjs.Observable(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.complete() // try to remove it
})
const bar2 = new rxjs.Observable(subscriber => {
subscriber.next(11)
subscriber.next(12)
subscriber.complete()
})
rxjs.concat(bar1, bar2).subscribe(x => {
console.log(x)
})
import * as rxjs from 'rxjs'
rxjs.interval(1000).pipe(
rxjs.operators.take(4),
).subscribe(x => {
console.log(x)
})
import * as rxjs from 'rxjs'
rxjs.interval(500).pipe(
rxjs.operators.take(4),
rxjs.operators.map((x) => x * 10),
rxjs.operators.mergeMap((x) =>
rxjs.interval(200).pipe(
rxjs.operators.take(5),
rxjs.operators.map((y) => x + y),
)
),
).subscribe(x => {
console.log(x)
})
import * as rxjs from 'rxjs'
rxjs.interval(500).pipe(
rxjs.operators.take(4),
rxjs.operators.map((x) => x * 10),
rxjs.operators.switchMap((x) =>
rxjs.interval(200).pipe(
rxjs.operators.take(5),
rxjs.operators.map((y) => x + y),
)
),
).subscribe(x => {
console.log(x)
})
redux-observable
from Redux Middleware Comparison
typesafe-actions
import { createStandardAction, createAsyncAction } from 'typesafe-actions'
const myAction = createStandardAction('myType')<TPayload>()
const { request, success, failure } = createAsyncAction(
'myRequestType',
'mySuccessType',
'myFailureType',
)<TRequestPayload, TSuccessPayload, TFailurePayload>()
getType, isActionOf - typeguards
io-ts
import * as t from 'io-ts'
import { PathReporter } from 'io-ts/lib/PathReporter'
import { fold } from 'fp-ts/lib/Either'
export const Margins = t.type({
top: t.number,
bottom: t.number,
left: t.number,
right: t.number,
})
export type Margins = t.TypeOf<typeof Margins>
export const createValidator = <T>(type: t.Type<T>) => (data: any): T => {
const validation = type.decode(data)
return fold(
() => {
throw new Error(PathReporter.report(validation).join('. '))
},
(value: T) => value,
)(validation)
}
import { createStore, compose, Store, applyMiddleware } from 'redux'
import { createEpicMiddleware } from 'redux-observable'
import { Action } from './actions'
import { IState, masterReducer } from './reducers'
import { epic } from './epics'
const epicMiddleware = createEpicMiddleware<Action, Action, IState>()
const configureStore = (initialState?: IState) => createStore(
masterReducer,
initialState,
compose(
applyMiddleware(
epicMiddleware,
),
),
)
export const store = configureStore()
epicMiddleware.run(epic)
import { ActionType, createStandardAction } from 'typesafe-actions'
const SubAction = {
addItem: createStandardAction('addItem')<string>()
}
// split by module, functionality, whatever...
export const Action = {
SubAction,
}
export type Action = ActionType<typeof Action>
import { Reducer, combineReducers } from 'redux'
import { Action } from './actions'
export interface IStateSlice {
list: string[]
}
const initialState: IStateSlice = { list: [] }
const addItem = (state: IStateSlice, item: string): IStateSlice => ({
...state,
list: [...state.list, item],
})
const sliceReducer: Reducer<IStateSlice, Action> = (state = initialState, action) => {
switch (action.type) {
case getType(Action.SubAction.addItem): return addItem(state, action.payload)
default: return state
}
}
// split by module, functionality, whatever...
export interface IState {
stateSlice: IStateSlice
}
const masterReducer = combineReducers<IState, Action>({
stateSlice: sliceReducer,
})
import { Epic, combineEpics } from 'redux-observable'
import { isActionOf } from 'typesafe-actions'
import { filter, tap, ignoreElements } from 'rxjs/operators'
import { Action } from './actions'
import { IState } from './reducers'
const someEpic: Epic<Action, Action, IState> = ($action, store$) =>
$action.pipe(
filter(isActionOf(Action.SubAction.addItem)),
tap((action) => console.log(action.payload)),
ignoreElements(),
)
export const epic = combineEpics(
someEpic,
)
import { from, of } from 'rxjs'
import { filter, switchMap, catchError } from 'rxjs/operators'
const DataType = t.type({ payload: t.array(t.string) })
type DataType = t.TypeOf<typeof DataType>
const dataValidator = validate(DataType)
const loadData = createAsyncAction(
'loadData',
'loadDataSuccess',
'loadDataFailure'
)<string, DataType, Error>()
const someEpic: ($action) =>
$action.pipe(
filter(isActionOf(loadData.request)),
switchMap(({ payload }) =>
from(fetch(`/api/${payload}`)).pipe(
mergeMap((response) => {
if (!response.ok) {
throw new Error('fetching error')
}
return response.json()
})
map(dataValidator),
map(loadData.success),
catchError((error: Error) => of(loadData.failure(error))),
),
),
)
import { from, of } from 'rxjs'
import { filter, mergeMap, switchMap, map, catchError } from 'rxjs/operators'
const someEpic: ($action) =>
$action.pipe(
filter(isActionOf(loadData.request)),
switchMap(({ payload }) =>
from(fetch(`/api/${payload}`)).pipe(
map(dataValidator),
mergeMap((data) => [
loadData.success(data),
notification.add('Data loaded'),
]),
catchError((error: Error) => of(loadData.failure(error))),
),
),
)
import { from, of } from 'rxjs'
import { filter, groupBy, mergeMap, switchMap, map, catchError } from 'rxjs/operators'
const getListing: Epic<Action, Action, IState> = ($action) =>
$action.pipe(
filter(isActionOf(Actions.Library.Resources.getListing.request)),
switchMap(({ payload: { resource } }) =>
from(fetch(`/api/${resource}`)).pipe(
map(validateListing),
map(Actions.Library.Resources.getListing.success),
catchError((error: Error) =>
of(Actions.Library.Resources.getListing.failure(error.message))),
),
),
)
groupBy(({ payload }) => payload.resource),
mergeMap((group) => group.pipe(
switchMap(({ payload: { resource } }) =>
from(fetch(`/api/${resource}`)).pipe(
map(validateListing),
map(Actions.Library.Resources.getListing.success),
catchError((error: Error) =>
of(Actions.Library.Resources.getListing.failure(error.message))),
),
),
)),
)
import { filter, switchMap, mergeMap } from 'rxjs/operators' import { makeGetFontConfig } from './selectors' const getFontConfig = makeGetFontConfig() const loadFont: Epic<Action, Action, IState> = ($action, state$) => $action.pipe( filter(isActionOf(Actions.Fonts.loadConfig.success)) mergeMap(() => $action.pipe( filter(isActionOf(Actions.Fonts.loadFont.request)), switchMap(([{ payload }]) => { const config = getFontConfig(store$.value, payload) ... }), ), ), )import { filter, switchMap, withLatestFrom } from 'rxjs/operators' import { makeGetFontConfig } from './selectors' const loadFont: Epic= ($action, store$) => $action.pipe( filter(isActionOf(Actions.Fonts.loadFont.request)), withLatestFrom($action.pipe(filter(isActionOf(Actions.Fonts.loadConfig.success)))), switchMap(([{ payload }]) =>{ const config = getFontConfig(store$.value, payload) ... }), ) import { combineLatest } from 'rxjs' import { filter, switchMap } from 'rxjs/operators' import { makeGetFontConfig } from './selectors' const getFontConfig = makeGetFontConfig() const loadFont: Epic<Action, Action, IState> = ($action, store$) => combineLatest( $action.pipe(filter(isActionOf(Actions.Fonts.loadFont.request))), $action.pipe(filter(isActionOf(Actions.Fonts.loadConfig.success))), ).pipe( switchMap(([{ payload }]) => { const config = getFontConfig(store$.value, payload) ... }), )
import { interval } from 'rxjs'
import { filter, switchMap, map, takeUntil } from 'rxjs/operators'
const periodicToken: Epic<Action, Action, IState> = ($action) =>
$action.pipe(
filter(isActionOf(Actions.User.login.success)),
switchMap(({ payload }) =>
interval(Math.max(payload.jwt.expires_in * 1000 - 5000, 5000)).pipe(
map(() => Actions.User.refreshToken()),
),
),
takeUntil($action.pipe(filter(isActionOf(Actions.User.logout)))),
)
import { empty, merge } from 'rxjs'
import { filter, switchMap, map, first } from 'rxjs/operators'
import { getLogedInUser } from './selectors'
const changeMyPassword: Epic<Action, Action, IState> = ($action, store$) =>
$action.pipe(
filter(isActionOf(Actions.User.changePassword)),
switchMap(({ payload }) => {
const logedInUser = getLogedInUser(store$.value)
return merge(
of(Actions.Users.changeUserPassword.request({
...payload,
username: logedInUser.username,
})),
$action.pipe(
filter(isActionOf(Actions.Users.changeUserPassword.success)),
filter(({ payload: username }) => username === logedInUser.username),
first(),
map(() => Actions.User.passwordChanged()),
),
)
}),
)
import { Epic, ActionsObservable } from 'redux-observable'
import { merge, of } from 'rxjs'
import { filter, mergeMap, map, takeUntil } from 'rxjs/operators'
import { actionTypes, actions } from 'redux-router5'
import { IState } from 'root/reducer'
import { Action as AppAction } from 'root/actions'
import router from 'routing'
type TransitionSuccess = ReturnType<typeof actions.transitionSuccess>
type NavigateTo = ReturnType<typeof actions.navigateTo>
type Router5Action = TransitionSuccess | NavigateTo
export type Action = AppAction | Router5Action
const isTransitionSuccess = (action: Action): action is TransitionSuccess =>
action.type === actionTypes.TRANSITION_SUCCESS
const isRouteEnter = (routeName: string) =>
({ payload: { previousRoute, route } }: TransitionSuccess): boolean =>
(previousRoute === null || !previousRoute.name.startsWith(routeName)) && route.name.startsWith(routeName)
const isRouteLeave = (routeName: string) =>
({ payload: { previousRoute, route } }: TransitionSuccess): boolean =>
(previousRoute === null || previousRoute.name.startsWith(routeName)) && !route.name.startsWith(routeName)
const $routeEnter = ($action: ActionsObservable<Action>, routeName: string) =>
$action.pipe(
filter(isTransitionSuccess),
filter(isRouteEnter(routeName)),
)
const $routeLeave = ($action: ActionsObservable<Action>, routeName: string) =>
$action.pipe(
filter(isTransitionSuccess),
filter(isRouteLeave(routeName)),
)
const $enteringRoute = ($action: ActionsObservable<Action>, routeName: string) => {
const fromAction = $routeEnter($action, routeName)
const alreadyActive = router.getState().name.startsWith(routeName)
return alreadyActive ? merge(of(null), fromAction) : fromAction
}
export const inRoute = (routeName: string, epic: Epic<Action, Action, IState>): Epic<Action, Action, IState> =>
($action, state$, dependencies) => $enteringRoute($action, routeName).pipe(
mergeMap(() => epic($action, state$, dependencies)),
takeUntil($routeLeave($action, routeName)),
)
export const onRouteEnter = (routeName: string, action: Action | Action[]): Epic<Action, Action, IState> =>
($action) => $enteringRoute($action, routeName).pipe(
Array.isArray(action)
? mergeMap(() => action)
: map(() => action),
)
export const onRouteLeave = (routeName: string, action: Action | Action[]): Epic<Action, Action, IState> =>
($action) => $routeLeave($action, routeName).pipe(
Array.isArray(action)
? mergeMap(() => action)
: map(() => action),
)
import { onRouteEnter } from './routerEpics'
const onEnter = onRouteEnter(
RouteName.Designer,
Actions.Designer.App.Element.resetTemplate(),
)
const onEnterAfterLogin: Epic<Action, Action, IState> = ($action, state$, dependencies) =>
$action.pipe(
filter(isActionOf(Actions.User.login.success)),
mergeMap(() =>
onRouteEnter(RouteName.Workflow, [
Actions.Workflow.Components.loadComponents.request(),
Actions.Workflow.Printers.loadPrinters.request(),
Actions.Workflow.Workflows.loadWorkflows.request(),
Actions.Workflow.IOTypes.loadIOTypes.request(),
Actions.Workflow.MimeTypes.loadMimeTypes.request(),
])($action, state$, dependencies),
),
takeUntil($action.pipe(filter(isActionOf(Actions.User.logout)))),
)
import { merge, of, from, interval } from 'rxjs'
import { filter, map, flatMap, catchError, throttleTime } from 'rxjs/operators'
import { inRoute } from './routerEpics'
const validationWorker = new Worker('validationWorker.js')
const validate = inRoute(RouteName.Designer, ($action, store$) =>
merge(
$action.pipe(filter((action) => validationActions.includes(action.type))),
interval(VALIDATION_INTERVAL),
).pipe(
throttleTime(VALIDATION_THROTTLE, undefined, { leading: true, trailing: true }),
flatMap(() => from(new Promise<IInvalidElement[]>((resolve, reject) => {
const onMessage = (e: MessageEvent) => {
const invalidElements = e.data
validationWorker.removeEventListener('message', onMessage)
resolve(invalidElements)
}
setTimeout(
() => {
validationWorker.removeEventListener('message', onMessage)
reject(new Error('Timeout'))
},
VALIDATION_TIMEOUT,
)
validationWorker.addEventListener('message', onMessage)
const template = getTemplate(store$.value)
validationWorker.postMessage(template)
}))),
map(Actions.Designer.App.Validation.validate.success),
catchError((error: Error) =>
of(Actions.Designer.App.Validation.validate.failure(error))),
),
)
const login: Epic<Action, Action, IState> = ($action) =>
$action.pipe(
filter(isActionOf(Actions.User.login.request)),
switchMap(({ payload: { username, password } }) =>
loginRequest(username, password).pipe(
mergeMap((jwt) => merge(
of(Actions.User.getPermissions.request()),
of(Actions.User.getUserRoles.request(username)),
from(zip(
$action.pipe(filter(isActionOf(Actions.User.getUserRoles.success))),
$action.pipe(filter(isActionOf(Actions.User.getPermissions.success))),
({ payload: tenantsRoles }) => tenantsRoles,
)).pipe(
first(),
mergeMap((tenantsRoles) =>
tenantsRoles.length > 1
? $action.pipe(
filter(isActionOf(Actions.User.selectTenant)),
switchMap(({ payload: newTenantId }) =>
newTenantId === jwt.tenantId
? of(loginSuccess(jwt, username))
: from(loginRequest(username, password, newTenantId)).pipe(
map((newJwt) => loginSuccess(newJwt, username)),
),
),
)
: of(loginSuccess(jwt, username)),
),
),
)),
catchError((error: Error) =>
of(Actions.User.login.failure(new Error(`cannot login: ${error.message}`)))),
),
),
)
import React from 'react'
import { Observable, Subject, Subscription, of, merge, from } from 'rxjs'
import { debounceTime, distinctUntilChanged, switchMap, startWith, map, mergeMap, catchError } from 'rxjs/operators'
interface ITextToImageProps {
text: string
}
interface ITextToImageState {
text: string
image?: string
error?: string
}
const arrayBufferToBase64 = (buffer: ArrayBuffer) => {
const bytes: number[] = [].slice.call(new Uint8Array(buffer))
const binary = bytes.reduce((str, b) => `${str}${String.fromCharCode(b)}`, '')
return window.btoa(binary)
}
class TextToImage extends React.Component<ITextToImageProps, ITextToImageState> {
public state: ITextToImageState = {
text: this.props.text,
}
private text$ = new Subject<string>()
private state$: Observable<Pick<ITextToImageState, 'image' | 'error'>> =
this.text$.pipe(
startWith(this.state.text),
debounceTime(500),
distinctUntilChanged(),
switchMap((text) => {
const reset = of({ image: undefined, error: undefined })
return text.length > 0
? merge(
reset,
from(fetch(`/toImage/${text}`)).pipe(
mergeMap((response) => response.arrayBuffer()),
map((buffer) => ({
image: `data:image/jpeg;base64,${arrayBufferToBase64(buffer)}`,
})),
catchError((error) => of({ error: error.message })),
),
)
: reset
}),
)
private stateUpdates$?: Subscription
public componentDidMount() {
this.stateUpdates$ = this.state$.subscribe(this.setState.bind(this))
}
public componentWillUnmount() {
if (this.stateUpdates$ !== undefined) {
this.stateUpdates$.unsubscribe()
}
}
public render() {
const { text, image, error } = this.state
return (
<>
<input value={text} onChange={this.setText} />
{(error !== undefined || image !== undefined)
? (
<div>
{error === undefined
? <img src={image} />
: error
}
</div>
)
: null
}
</>
)
}
private setText = (ev: React.ChangeEvent<HTMLInputElement>) => {
const text = ev.target.value
this.setState({ text })
this.text$.next(text)
}
}
export default TextToImage
Happy coding! :)