/**
* @author Michael Hasenstein <hasenstein@yahoo.com>
* @copyright REFINIO GmbH 2017
* @license CC-BY-NC-SA-2.5; portions MIT License
* @version 0.0.1
*/
/**
* This module implements a very simple message bus system for loose coupling of components. It
* is only used for logging initially, but we keep it in case there is a good reason for loose
* coupling, which may include communication between different instances. Messages are sent on a
* "bus" and any other module can subscribe to messages of certain types.
* The {@link logger.module:ts|logger} module is an example for a message consumer.
* @module
*/
/**
* Callback function definition for message bus event handlers
*
* Type definition:
*
* ```
* (src: string, ...messages: unknown[]) => void
* ```
* @typedef {Function} MessageHandlerCb
* @param {string} src
* @param {...*} messages
* @returns {undefined} Returns undefined
*/
export type MessageHandlerCb = (src: string, ...messages: unknown[]) => void;
/**
* An API-object that provides several functions bound to a context established by the function
* that created the API-object.
*
* ### Example
*
* ```
* // Result is a MessageBusObj
* import {create} from './message-bus';
* const MessageBus = create('some-id-string');
* ```
* @global
* @typedef {object} MessageBusObj
* @property {string} id - The ID used on the message bus to identify messages sent through this
* MessageBus object.
* @property {function(string,MessageHandlerCb):void} on - `(type: string, fn: MessageHandlerCb) =>
* void` — Register a callback for a given type of message
* @property {function(string,MessageHandlerCb):void} once - `(type: string, fn:
* MessageHandlerCb) => void` — Like `on` but the callback is removed after it is called
* @property {function(string,MessageHandlerCb):void} remove - `(type: string, fn: MessageHandlerCb)
* => void` — Remove the given callback function
* @property {function(string,...*):void} send - `(type: string, ...messages: unknown[]) => void`
* — Send a message of the given type. The arguments must match what potential recipients
* of that type would expect.
*/
export interface MessageBusObj {
id: string;
on: (type: string, fn: MessageHandlerCb) => void;
once: (type: string, fn: MessageHandlerCb) => void;
remove: (type: string, fn: MessageHandlerCb) => void;
send: (type: string, ...messages: unknown[]) => void;
}
import {createError} from './errors';
const handlers: Map<string, Set<MessageHandlerCb>> = new Map();
const persistedMessageBusesIds: string[] = [];
/* ************************************************************************************
* EVENT SYSTEM
* ************************************************************************************/
/**
* This function is available on the {@link MessageBusObj} API-object returned by the
* {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
*
* Note that if you call "on" more than once with the same callback function A, there will be no
* warning (no collision detection), and B, **the callback will still be called just once**.
*
* ## Event types (examples)
*
* Sent with `msg={Error}` from anywhere. Details are in the Error object.
* `[any]:error`
*
* Debug messages
* `[any]:info`
*
* Sent with msg={{type,hash}} whenever a new ONE microdata object has been saved to long-term
* storage.
* ´storage:new-object`
*
* For example emitted by node-imap: `alert(<string>message)`: Emitted when the server issues an
* alert (e.g. "the server is going down for maintenance")
* `imap-mailbox-retriever:alert`
*
* Sent by an ImapMailboxRetriever instance after closing the IMAP connection
* `imap-mailbox-retriever:end`
* @instance
* @param {string} type - String format "source:type" or only "type" for all handlers from any
* source. Request types should end in "-request", but I won't add code to check it. Handlers
* registered with on(...) should return a Promise - but I don't check that either.
* @param {MessageHandlerCb} fn
* @returns {undefined}
*/
function on(type: string, fn: MessageHandlerCb): void {
let handler = handlers.get(type);
// If no listener has been registered for "type" - which may be "source:type" or just "type"
// - initialize the Map first. Note that this separates event subscriptions for the same
// general type based on "source", which makes it impossible to determine insertion order
// unless the exact same string was used to subscribe a listener function because it creates
// different Map()'s. Within a Map order as generated by the iterator is guaranteed to be
// insertion order.
if (handler instanceof Set) {
if (handler.has(fn)) {
throw createError('MB-ON1', {type});
}
} else {
handler = new Set();
handlers.set(type, handler);
}
// A listener function can only be inserted once - the last call to on(...) for the exact
// same listener function wins.
handler.add(fn);
}
/**
* This function is available on the {@link MessageBusObj} API-object returned by the
* {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
*
* Note that unlike with "on", when you call "once" more than once with the same callback
* function A, just like with "on" there will be no warning, but B, unlike "on" the callback
* function will be called as many times as you called "once" to register it (albeit just once
* each time, of course).
* @see {@link message-bus.module:ts#on|on}
* @instance
* @param {string} type - String format "source:type" or only "type" for all handlers from any
* source.
* @param {MessageHandlerCb} fn
* @returns {undefined}
*/
function once(type: string, fn: MessageHandlerCb): void {
function g(...messages: unknown[]): void {
remove(type, g);
fn(type, ...messages);
}
on(type, g);
}
/**
* This function is available on the {@link MessageBusObj} API-object returned by the
* {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
* @instance
* @param {string} type
* @param {MessageHandlerCb} fn
* @returns {undefined}
*/
function remove(type: string, fn: MessageHandlerCb): void {
const handler = handlers.get(type);
if (handler instanceof Set) {
handler.delete(fn);
}
}
/**
* This function is available on the {@link MessageBusObj} API-object returned by the
* {@link message-bus.module:ts.createMessageBus|createMessageBus} function.
*
* Listeners are called synchronously in insertion order - but all specific listeners, those who
* specified a source "source:type" in the .on(...) function, are called first. Listeners for
* handlers to be called regardless of the source are next.
* @instance
* @param {string} src
* @param {string} type
* @param {...*} [messages]
* @returns {undefined}
*/
function send(src: string, type: string, ...messages: readonly unknown[]): void {
const callbacks: MessageHandlerCb[] = [];
// Notify listeners that want to be notified of this event only if it comes from the
// specified source.
const srcAndTypeHandler = handlers.get(src + ':' + type);
if (srcAndTypeHandler instanceof Set) {
callbacks.push(...srcAndTypeHandler.values());
}
// Now notify all listeners that want to be notified regardless of the source of the event.
const typeHandler = handlers.get(type);
if (typeHandler instanceof Set) {
callbacks.push(...typeHandler.values());
}
// NOTE: We deliberately don't catch errors. If one of the subscriber functions throws an
// error we let it crash.
for (const fn of callbacks) {
fn(src, ...messages);
}
}
/*
* Ask for a response ONE TIME. For example, ask for the contents of an object with hash XYZ.
* The request is broadcast to all module who registered for the type of request, and when a
* module answers (first), the value is returned to the requester. This function supports the
* loose coupling of ONE modules, the alternative are hard-coded dependencies where one module
* holds a reference to specific modules that can handle the particular request.
* Event handlers who register for this event type must return a Promise, because one, requests
* usually are served by asynchronous functions, and two, the function making the request wants
* a response specifically for that request, so a response should be attached to the request
* which Promises do very well and automatically.
* @instance
* @param {string} type type of request - by convention ends with "-request"
* @param {...*} [args] Arguments to add to the request
* @returns {Promise<*>}
*/
/*
function request (type: string): Promise<any> {
// Request types should end in "-request", but I won't add code to check it. Handlers
// registered with on(...) should return a Promise - but I don't check that either.
if (handlers.has(type)) {
// Idea: if msg.flagAll then Promise.all(...), or arbitrary number, or "majority"
// Interesting dilemma: Promise.race or Promise.any? Good arguments for either.
// Depends on how consistent the responders are, is it _possible_ for one to succeed and
// another one to fail? In that case "any" would be better.
return Promise.race(
Array.from(handlers.get(type).values())
.map(([fn, fnArgs]) => fn(...fnArgs))
);
} else {
return Promise.reject(
createError('MB-REQ1 No handler found to respond to request of type $type'), {type}
);
}
}
*/
/**
* Creates a MessageBus object associated with the given ID string.
* @static
* @param {string} moduleId - A string used to identify the source of any messages sent through
* the returned MessageBus object's methods.
* @returns {MessageBusObj} An API-object that makes several functions available.
*/
export function createMessageBus(moduleId: string): MessageBusObj {
if (moduleId === undefined) {
throw createError('MB-CRMB1');
}
const id = moduleId.endsWith('.js')
? moduleId.slice(0, -3) // remove ".js" ending
: moduleId;
// Return an API-object to access this module's methods
const messageBus: MessageBusObj = {
id,
on,
once,
remove,
send: (type, ...messages): void => send(id, type, ...messages)
// request
};
persistedMessageBusesIds.push(messageBus.id);
return messageBus;
}
// Create a per-module hash storage for a counter that is increased for each new MessageBus
// object generated for a given module. This is used to create unique ID strings for modules
// which can have more than one instance using the message bus.
const runningIdStore: Map<string, number> = new Map();
/**
* Frontend to the {@link message-bus.module:ts.createMessageBus|createMessageBus} function which
* adds a number to the given `moduleId` string that increases by one each time the same
* `moduleId` string is used. This function is for creating {@link MessageBusObj} instances for
* modules that are not singletons. The created moduleId that can be used to subscribe to
* messages from this component will be of the form `${moduleId}-${number}`, for example, if
* `moduleId` is `myModule` the actual ID associated with the {@link MessageBusObj} API-object
* will be `myModule-3` if this method is called three times with this same `moduleId`.
* @static
* @param {string} moduleId - A string used to identify the source of any messages sent through
* the returned MessageBus object's methods. Internally a counter is appended.
* @returns {MessageBusObj}
*/
export function createWithRunningId(moduleId: string): MessageBusObj {
if (moduleId === undefined) {
throw createError('MB-CRMBID1');
}
// Since there can be more than one instance we add this counter to the moduleId used to
// identify messages from this module on the MessageBus. Each instance gets its own ID by
// adding the counter.
// The module name is used as base name
const name = moduleId.endsWith('.js')
? moduleId.slice(0, -3) // remove ".js" ending
: moduleId;
// To the base name we add a counter to get a unique per-instance ID string
let counter = runningIdStore.get(name);
// Let's just say it is extremely unlikely the MAX_SAFE_INTEGER will ever be reached, but we
// put it in there because of the MS-DOS "640 kB ought to be enough for anybody" Bill Gates
// quote, which actually is a myth but that's beside the point.
// Actually, if we assume 1000 requests per second there will be duplicate and non-integer
// IDs about every 285,616 years: Number.MAX_SAFE_INTEGER / (1000 * 60 * 60 * 24 * 365)
// But who knows what kind of enormously fast quantum machines will run this code without
// reboot for centuries to come...
if (counter === undefined || counter === Number.MAX_SAFE_INTEGER) {
counter = 0;
} else {
counter += 1;
}
runningIdStore.set(name, counter);
// Return an API-object to access this module's methods
return createMessageBus(`${name}-${counter}`);
}
/**
* Creates and returns a cloned copy of the internal array of message bus IDs currently in use
* @static
* @returns {MessageBusObj[]}
*/
export function messageBusesIds(): string[] {
return [...persistedMessageBusesIds];
}