Source

core/multiplex.ts

/**
 * @module core
 */

import { isFunction, isPromise } from "@thi.ng/checks"

import { CMD_SUB$, CMD_ARGS, CMD_RESO, CMD_ERRO, CMD_SRC$, CMD_WORK } from "@-0/keys"

import { stringify_type, xKeyError, key_index_err, diff_keys } from "@-0/utils"

import { pubsub } from "@thi.ng/rstream"
import { map } from "@thi.ng/transducers"

/**
 * User-land event dispatch stream
 *
 * This stream is directly exposed to users. Any one-off
 * Commands `next`ed into this stream are sent to the
 * `command$` stream. Arrays of Commands (Tasks) are sent to
 * the `task$` stream.
 *
 * TODO: add examples,`beforeunload` event handler within #4
 *    (orphan): SEE https://youtu.be/QQukWZcIptM and enable
 *    ctx.run.cancel() via external or internal events
 *    (e.g., popstate / { sub$:  "cancel" })
 *
 */
export const run$ = pubsub({
  topic: x => !!x[CMD_SUB$],
  id: "run$_stream",
  equiv: (x, y) => x === y || y === "_TRACE_STREAM"
})

/**
 *
 * Primary user-land _READ_ stream. For attaching handlers
 * for responding to emmitted Commands
 */
export const out$ = pubsub({
  topic: x => x[CMD_SUB$],
  id: "out$_stream",
  equiv: (x, y) => x === y || y === "_TRACE_STREAM"
})

/**
 *
 * Primary fork/bisect stream for indivual commands.
 * attached to a `pubsub` stemming from this stream. The
 * `topic` function used to alert downstream handlers is a
 * simple lookup of the `sub$` key of the command
 */
export const command$ = run$.subscribeTopic(
  true,
  {
    next: x => out$.next(x),
    error: console.warn
  },
  { id: "command$_stream" }
)

const err_str = "Spooling Interupted" // <- add doc link to error strings

const nosub$_err = (c, i) =>
  console.warn(`
  🔥 No sub$ included for a Command with a primitive for 'args'. 
  🔥 Ergo, nothing was done with this Command: 
  
  ${JSON.stringify(c)}
  
  ${key_index_err(c, i)}
  
  Hope that helps!
  `)

/**
 *
 * Handles Collections (array) of Commands ("Tasks") which
 * require _ordered_ choreography and/or have a dependency
 * on some (a)sync data produced by a user interaction.
 *
 * ### Subtasks:
 *
 * Subtasks are the way you compose tasks. Insert a Task and
 * the spool will unpack it in place (super -> sub
 * order preserved) A Subtask must be defined as a unary
 * function that accepts an accumulator object and returns a
 * Task, e.g.:
 *
 * #### PSEUDO
 * ```js
 * // { C: Command }
 * // ( { A: Accumulator }: Object ) => [{C},{C}]: Subtask
 * let someSubtask = ({A}) => [{C}, {C}, ({A})=>[{C},{C}], ...]
 * ```
 *
 * #### Example
 * ```js
 * // subtask example:
 * let subtask1 = acc => [
 *  { sub$: "acc"
 *  , args: { data: acc.data } },
 *  { sub$: "route"
 *  , args: { route: { href: acc.href } } }
 * ]
 *
 * // task
 * let task = [
 *  { args: { href: "https://my.io/todos" } }, // acc init
 *  { sub$: "fetch"
 *  , args: ({ href }) => fetch(href).then(r => r.json())
 *  , erro: (acc, err) => ({ sub$: "cancel", args: err })
 *  , reso: (acc, res) => ({ data: res }) },
 *  acc => subtask1(acc), // subtask reference
 *  { sub$: "FLIP" , args: "done" }
 * ]
 * ```
 * ### Ad-hoc stream injection Example
 *
 * ```js
 * import { stream } from "@thi.ng/rstream"
 * import { map, comp } from "@thi.ng/transducers"
 *
 * // ad-hoc stream
 * let login = stream().subscribe(comp(
 *  map(x => console.log("login ->", x)),
 *  map(({ token }) => loginToMyAuth(token))
 * ))
 *
 * // subtask
 * let subtask_login = ({ token }) => [
 *  { sub$: login // <- stream
 *  , args: () => ({ token }) } // <- use acc
 * ]
 *
 * // task
 * let task = [
 *  // no sub$, just pass data
 *  { args: { href: "https://my.io/auth" } },
 *  { sub$: login , args: () => "logging in..." },
 *  { sub$: "AUTH"
 *  , args: ({ href }) => fetch(href).then(r => r.json())
 *  , erro: (acc, err) => ({ sub$: "cancel", args: err })
 *  , reso: (acc, res) => ({ token: res }) },
 *  acc => subtask_login(acc),
 *  { sub$: login , args: () => "log in success" }
 * ]
 * ```
 *
 **/
export const multiplex = task_array =>
  task_array.reduce(async (a, c, i) => {
    const acc = await a
    // console.log("ACCUMULATOR =>", acc)
    if (isFunction(c)) {
      try {
        const recur = c(acc)
        // ensures accumulator is preserved between stacks
        recur.unshift({ [CMD_ARGS]: acc })
        return multiplex(recur)
      } catch (e) {
        console.warn(err_str, e)
        return
      }
    }
    const sub$ = c[CMD_SUB$]
    const args = c[CMD_ARGS]
    const erro = c[CMD_ERRO]
    const reso = c[CMD_RESO]
    // const _source$ = c[source$]
    // const _handler = c[handler]
    const knowns = [CMD_SUB$, CMD_ARGS, CMD_RESO, CMD_ERRO, CMD_SRC$, CMD_WORK]
    const [unknowns] = diff_keys(knowns, c)

    if (unknowns.length > 0) throw new Error(xKeyError(err_str, c, unknowns, sub$, i))
    const arg_type = stringify_type(args)

    let result = args

    /* RESOLVING ARGS */
    if (arg_type !== "PROMISE" && reso) {
      /**
       * If some signature needs to deal with both Promises
       * and non-Promises, non-Promises are wrapped in a
       * Promise to "lift" them into the proper context for
       * handling
       */
      result = Promise.resolve(args)
    }
    if (args !== Object(args) && !sub$) {
      nosub$_err(c, i)
      return acc
    }
    if (arg_type === "PROMISE") {
      // result = await discardable(args).catch(e => e)
      result = await args.catch(e => e)
    }
    if (arg_type === "THUNK") {
      // if thunk, dispatch to ad-hoc stream, return acc
      // as-is âš  this command will not be waited on
      result = args()
      console.log(`dispatching to ad-hoc stream: ${sub$.id}`)
      sub$.next(result)
      return acc
    }
    // if function, call it with acc and resolve any Promises
    if (arg_type === "FUNCTION") {
      let temp = args(acc)
      result = isPromise(temp) ? await temp.catch(e => e) : temp
    }
    // if object, send the Command as-is and spread into acc
    if (arg_type === "OBJECT") {
      if (!sub$) return { ...acc, ...args }
      command$.next(c)
      return { ...acc, ...args }
    }

    /* RESULT HANDLERS */
    // TODO: 🤔 think harder about the reso/erro handling
    if (reso) {
      // promise rejection handler
      if (erro && result instanceof Error) {
        let error = erro(acc, result)
        if (error.sub$) return command$.next(error)
        console.warn(err_str, "Promise rejected:", result)
        return acc
      }
      // resovled promise handler
      if (!(result instanceof Error)) {
        let resolved = reso(acc, result)
        // resolved promise with no sub$ key -> spread
        // resolved value into acc
        if (resolved.sub$) command$.next(resolved)
        else if (!sub$) return { ...acc, ...resolved }
        result = resolved
      }
      console.warn(`no 'erro' (Error handler) set for ${c}`)
    }
    // no sub$ key & not a promise -> just spread into acc
    if (!reso && !sub$) return { ...acc, ...result }

    // error, but no error handler
    if (result instanceof Error) {
      console.warn(err_str, result)
      return acc
    }
    if (result !== Object(result)) {
      if (!sub$) {
        nosub$_err(c, i)
        return acc
      }
      // if the final result is primitive, you can't refer
      // to this value in proceeding Commands -> send the
      // Command as-is, return acc as-is.
      command$.next({ [CMD_SUB$]: sub$, [CMD_ARGS]: result })
      return acc
    }
    // if the result has made it this far, send it along
    // console.log(`${sub$} made it through`)
    command$.next({ [CMD_SUB$]: sub$, [CMD_ARGS]: result })
    return { ...acc, ...result }
  }, Promise.resolve({}))

/**
 *
 * Task stream that handles Arrays of Commands. Dispatches
 * to `multiplex`er (the heart of `spule`)
 *
 */
export const task$ = run$.subscribeTopic(
  false,
  {
    next: multiplex,
    error: console.warn
  },
  { id: "task$_stream" }
)