Source: message-bus.ts

/**
 * @author Michael Hasenstein <hasenstein@yahoo.com>
 * @copyright REFINIO GmbH 2017
 * @license CC-BY-NC-SA-2.5; portions MIT License
 * @version 0.0.1
 */

/**
 * This module implements a very simple message bus system for loose coupling of components. It
 * is only used for logging initially, but we keep it in case there is a good reason for loose
 * coupling, which may include communication between different instances. Messages are sent on a
 * "bus" and any other module can subscribe to messages of certain types.
 * The {@link logger.module:ts|logger} module is an example for a message consumer.
 * @module
 */

/**
 * Callback function definition for message bus event handlers
 *
 * Type definition:
 *
 * ```
 * (src: string, ...messages: unknown[]) => void
 * ```
 * @typedef {Function} MessageHandlerCb
 * @param {string} src
 * @param {...*} messages
 * @returns {undefined} Returns undefined
 */
export type MessageHandlerCb = (src: string, ...messages: unknown[]) => void;

/**
 * An API-object that provides several functions bound to a context established by the function
 * that created the API-object.
 *
 * ### Example
 *
 * ```
 * // Result is a MessageBusObj
 * import {create} from './message-bus';
 * const MessageBus = create('some-id-string');
 * ```
 * @global
 * @typedef {object} MessageBusObj
 * @property {string} id - The ID used on the message bus to identify messages sent through this
 * MessageBus object.
 * @property {function(string,MessageHandlerCb):void} on - `(type: string, fn: MessageHandlerCb) =>
 * void` &mdash; Register a callback for a given type of message
 * @property {function(string,MessageHandlerCb):void} once - `(type: string, fn:
 * MessageHandlerCb) => void` &mdash; Like `on` but the callback is removed after it is called
 * @property {function(string,MessageHandlerCb):void} remove - `(type: string, fn: MessageHandlerCb)
 * => void` &mdash; Remove the given callback function
 * @property {function(string,...*):void} send - `(type: string, ...messages: unknown[]) => void`
 * &mdash; Send a message of the given type. The arguments must match what potential recipients
 * of that type would expect.
 */
export interface MessageBusObj {
    id: string;
    on: (type: string, fn: MessageHandlerCb) => void;
    once: (type: string, fn: MessageHandlerCb) => void;
    remove: (type: string, fn: MessageHandlerCb) => void;
    send: (type: string, ...messages: unknown[]) => void;
}

import {createError} from './errors';

const handlers: Map<string, Set<MessageHandlerCb>> = new Map();
const persistedMessageBusesIds: string[] = [];

/* ************************************************************************************
 *  EVENT SYSTEM
 * ************************************************************************************/

/**
 * This function is available on the {@link MessageBusObj} API-object returned by the
 * {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
 *
 * Note that if you call "on" more than once with the same callback function A, there will be no
 * warning (no collision detection), and B, **the callback will still be called just once**.
 *
 * ## Event types (examples)
 *
 * Sent with `msg={Error}` from anywhere. Details are in the Error object.
 * `[any]:error`
 *
 * Debug messages
 * `[any]:info`
 *
 * Sent with msg={{type,hash}} whenever a new ONE microdata object has been saved to long-term
 * storage.
 * ´storage:new-object`
 *
 * For example emitted by node-imap: `alert(<string>message)`: Emitted when the server issues an
 * alert (e.g. "the server is going down for maintenance")
 * `imap-mailbox-retriever:alert`
 *
 * Sent by an ImapMailboxRetriever instance after closing the IMAP connection
 * `imap-mailbox-retriever:end`
 * @instance
 * @param {string} type - String format "source:type" or only "type" for all handlers from any
 * source. Request types should end in "-request", but I won't add code to check it. Handlers
 * registered with on(...) should return a Promise - but I don't check that either.
 * @param {MessageHandlerCb} fn
 * @returns {undefined}
 */
function on(type: string, fn: MessageHandlerCb): void {
    let handler = handlers.get(type);

    // If no listener has been registered for "type" - which may be "source:type" or just "type"
    // - initialize the Map first. Note that this separates event subscriptions for the same
    // general type based on "source", which makes it impossible to determine insertion order
    // unless the exact same string was used to subscribe a listener function because it creates
    // different Map()'s. Within a Map order as generated by the iterator is guaranteed to be
    // insertion order.
    if (handler instanceof Set) {
        if (handler.has(fn)) {
            throw createError('MB-ON1', {type});
        }
    } else {
        handler = new Set();
        handlers.set(type, handler);
    }

    // A listener function can only be inserted once - the last call to on(...) for the exact
    // same listener function wins.
    handler.add(fn);
}

/**
 * This function is available on the {@link MessageBusObj} API-object returned by the
 * {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
 *
 * Note that unlike with "on", when you call "once" more than once with the same callback
 * function A, just like with "on" there will be no warning, but B, unlike "on" the callback
 * function will be called as many times as you called "once" to register it (albeit just once
 * each time, of course).
 * @see {@link message-bus.module:ts#on|on}
 * @instance
 * @param {string} type - String format "source:type" or only "type" for all handlers from any
 * source.
 * @param {MessageHandlerCb} fn
 * @returns {undefined}
 */
function once(type: string, fn: MessageHandlerCb): void {
    function g(...messages: unknown[]): void {
        remove(type, g);
        fn(type, ...messages);
    }

    on(type, g);
}

/**
 * This function is available on the {@link MessageBusObj} API-object returned by the
 * {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
 * @instance
 * @param {string} type
 * @param {MessageHandlerCb} fn
 * @returns {undefined}
 */
function remove(type: string, fn: MessageHandlerCb): void {
    const handler = handlers.get(type);

    if (handler instanceof Set) {
        handler.delete(fn);
    }
}

/**
 * This function is available on the {@link MessageBusObj} API-object returned by the
 * {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
 *
 * Listeners are called synchronously in insertion order - but all specific listeners, those who
 * specified a source "source:type" in the .on(...) function, are called first. Listeners for
 * handlers to be called regardless of the source are next.
 * @instance
 * @param {string} src
 * @param {string} type
 * @param {...*} [messages]
 * @returns {undefined}
 */
function send(src: string, type: string, ...messages: readonly unknown[]): void {
    const callbacks: MessageHandlerCb[] = [];

    // Notify listeners that want to be notified of this event only if it comes from the
    // specified source.
    const srcAndTypeHandler = handlers.get(src + ':' + type);

    if (srcAndTypeHandler instanceof Set) {
        callbacks.push(...srcAndTypeHandler.values());
    }

    // Now notify all listeners that want to be notified regardless of the source of the event.
    const typeHandler = handlers.get(type);

    if (typeHandler instanceof Set) {
        callbacks.push(...typeHandler.values());
    }

    // NOTE: We deliberately don't catch errors. If one of the subscriber functions throws an
    // error we let it crash.
    for (const fn of callbacks) {
        fn(src, ...messages);
    }
}

/*
 * Ask for a response ONE TIME. For example, ask for the contents of an object with hash XYZ.
 * The request is broadcast to all module who registered for the type of request, and when a
 * module answers (first), the value is returned to the requester. This function supports the
 * loose coupling of ONE modules, the alternative are hard-coded dependencies where one module
 * holds a reference to specific modules that can handle the particular request.
 * Event handlers who register for this event type must return a Promise, because one, requests
 * usually are served by asynchronous functions, and two, the function making the request wants
 * a response specifically for that request, so a response should be attached to the request
 * which Promises do very well and automatically.
 * @instance
 * @param {string} type type of request - by convention ends with "-request"
 * @param {...*} [args] Arguments to add to the request
 * @returns {Promise<*>}
 */

/*
 function request (type: string): Promise<any> {
 // Request types should end in "-request", but I won't add code to check it. Handlers
 // registered with on(...) should return a Promise - but I don't check that either.
 if (handlers.has(type)) {
 // Idea: if msg.flagAll then Promise.all(...), or arbitrary number, or "majority"
 // Interesting dilemma: Promise.race or Promise.any? Good arguments for either.
 // Depends on how consistent the responders are, is it _possible_ for one to succeed and
 // another one to fail? In that case "any" would be better.
 return Promise.race(
 Array.from(handlers.get(type).values())
 .map(([fn, fnArgs]) => fn(...fnArgs))
 );
 } else {
 return Promise.reject(
   createError('MB-REQ1 No handler found to respond to request of type $type'), {type}
 );
 }
 }
 */

/**
 * Creates a MessageBus object associated with the given ID string.
 * @static
 * @param {string} moduleId - A string used to identify the source of any messages sent through
 * the returned MessageBus object's methods.
 * @returns {MessageBusObj} An API-object that makes several functions available.
 */
export function createMessageBus(moduleId: string): MessageBusObj {
    if (moduleId === undefined) {
        throw createError('MB-CRMB1');
    }

    const id = moduleId.endsWith('.js')
        ? moduleId.slice(0, -3) // remove ".js" ending
        : moduleId;

    // Return an API-object to access this module's methods
    const messageBus: MessageBusObj = {
        id,
        on,
        once,
        remove,
        send: (type, ...messages): void => send(id, type, ...messages)
        // request
    };
    persistedMessageBusesIds.push(messageBus.id);
    return messageBus;
}

// Create a per-module hash storage for a counter that is increased for each new MessageBus
// object generated for a given module. This is used to create unique ID strings for modules
// which can have more than one instance using the message bus.
const runningIdStore: Map<string, number> = new Map();

/**
 * Frontend to the {@link message-bus.module:ts.createMessageBus|createMessageBus} function which
 * adds a number to the given `moduleId` string that increases by one each time the same
 * `moduleId` string is used. This function is for creating {@link MessageBusObj} instances for
 * modules that are not singletons. The created moduleId that can be used to subscribe to
 * messages from this component will be of the form `${moduleId}-${number}`, for example, if
 * `moduleId` is `myModule` the actual ID associated with the {@link MessageBusObj} API-object
 * will be `myModule-3` if this method is called three times with this same `moduleId`.
 * @static
 * @param {string} moduleId - A string used to identify the source of any messages sent through
 * the returned MessageBus object's methods. Internally a counter is appended.
 * @returns {MessageBusObj}
 */
export function createWithRunningId(moduleId: string): MessageBusObj {
    if (moduleId === undefined) {
        throw createError('MB-CRMBID1');
    }

    // Since there can be more than one instance we add this counter to the moduleId used to
    // identify messages from this module on the MessageBus. Each instance gets its own ID by
    // adding the counter.

    // The module name is used as base name
    const name = moduleId.endsWith('.js')
        ? moduleId.slice(0, -3) // remove ".js" ending
        : moduleId;

    // To the base name we add a counter to get a unique per-instance ID string
    let counter = runningIdStore.get(name);

    // Let's just say it is extremely unlikely the MAX_SAFE_INTEGER will ever be reached, but we
    // put it in there because of the MS-DOS "640 kB ought to be enough for anybody" Bill Gates
    // quote, which actually is a myth but that's beside the point.
    // Actually, if we assume 1000 requests per second there will be duplicate and non-integer
    // IDs about every 285,616 years: Number.MAX_SAFE_INTEGER / (1000 * 60 * 60 * 24 * 365)
    // But who knows what kind of enormously fast quantum machines will run this code without
    // reboot for centuries to come...
    if (counter === undefined || counter === Number.MAX_SAFE_INTEGER) {
        counter = 0;
    } else {
        counter += 1;
    }

    runningIdStore.set(name, counter);

    // Return an API-object to access this module's methods
    return createMessageBus(`${name}-${counter}`);
}

/**
 * Creates and returns a cloned copy of the internal array of message bus IDs currently in use
 * @static
 * @returns {MessageBusObj[]}
 */
export function messageBusesIds(): string[] {
    return [...persistedMessageBusesIds];
}