/**
* @author Michael Hasenstein <hasenstein@yahoo.com>
* @copyright REFINIO GmbH 2017
* @license CC-BY-NC-SA-2.5; portions MIT License
* @version 0.0.1
*/
/**
* This module provides an API for a websocket connection. Services provided by one side can be
* called by the other through a promisified cross-network function (service) call, and use of a
* communication server as middleman so that clients that themselves cannot provide a network
* service can act as service providers through the middleman.
*
* Receiving events from the communication partner(s) is possible by providing an
* event-message receiving service on the client's side.
*
* ONE.core uses this module for its {@link chum-sync.ts|chum-sync} module. The entire module
* can be used by apps using the one.core library on both sides to provide cross-client
* communication.
* @module
*/
/**
* The API assumes that all communication goes through a communication server that forwards
* messages between clients that usually could not connect directly to one another. Client code
* receives the API-object of this type when the other client has connected to the same
* communication server, so that an end-to-end connection exists with the comm-server in the
* middle as forwarding agent.
*
* Use `send()` to send requests, use `close()` to end the connection. The final close status
* consisting of code and reason from the WebSocket's CloseEvent (see
* {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}) are available
* directly through the `promise`. If the CloseEvent was the result of calling
* `close()` as either a resolved promise if the code was 1000 (`NORMAL`), or a rejected
* promise with an Error object with code and reason in the Error's `message` string property.
*
* Note that none of the API functions need to be bound (they don't use `this`).
* @typedef {object} WebsocketPromisifierAPI
* @property {Promise<{code:number,reason:string}>} promise - Promise tracking the websocket state
* @property {number} connId - The ID of the underlying `Connection`
* @property {function(number,AnyFunction):void} addService
* @property {function(number):void} removeService
* @property {function():void} clearServices
* @property {function(string,string,number=):Promise<undefined>} connect
* @property {function(number,...*):Promise<*>} send - 1st param: Service ID, rest: params for
* service function
* @property {function(string):void} close - Initiate connection shutdown, results are in
* `promise. The optional close reason string must be no longer than 123 bytes (encoded in
* UTF-8). If it is longer, one.models' WebSocketPlugin close() function will shorten it
* automatically.
* @property {WebsocketStatistics} stats - Read-only access to sent/received bytes and nr.
* of requests statistics
*/
export interface WebsocketPromisifierAPI {
promise: Promise<WebsocketStatistics>;
connId: number;
addService: (id: number, fn: AnyFunction) => void;
removeService: (id: number) => void;
clearServices: () => void;
send: (type: number, ...args: readonly unknown[]) => Promise<unknown>;
close: (reason?: string) => void;
stats: Readonly<WebsocketStatistics>;
}
export interface MsgResponseObject {
responseId: number;
type: 'data';
data: unknown;
}
export interface MsgResponseStreamObject {
responseId: number;
type: 'stream';
chunk: string;
encoding: undefined | 'base64' | 'utf8';
}
export interface MsgResponseStreamEndObject {
responseId: number;
type: 'stream-end';
}
export interface MsgResponseStreamErrorObject {
responseId: number;
type: 'stream-error';
error: string;
}
export interface MsgResponseErrorObject {
responseId: number;
type: 'error';
error: string;
}
export type MsgResponseTypes =
| MsgResponseObject
| MsgResponseStreamObject
| MsgResponseStreamEndObject
| MsgResponseStreamErrorObject
| MsgResponseErrorObject;
export interface MsgRequestObject {
requestId: number;
type: 'request';
serviceId: number;
args?: undefined | readonly unknown[];
}
export interface MsgRequestErrorObject {
requestId: number;
type: 'write-stream-error';
}
export type MsgRequestTypes = MsgRequestObject | MsgRequestErrorObject;
export type FindServiceByChannelAndId = (id: number) => void | AnyFunction;
export interface EncryptedConnectionInterface {
id: number;
close(reason?: string): void;
send(message: Uint8Array | string): void;
bufferedAmount: number;
state: {
onEnterState: {
listen(listener: (state: 'connecting' | 'open' | 'closed') => void): () => void;
};
};
onMessage: {
listen(listener: (message: Uint8Array | string) => void): () => void;
};
}
export type RequestData = [
serviceId: number,
resolve: PromiseResolveCb<any>,
reject: PromiseRejectCb
];
import {createError} from './errors';
import {createMessageBus} from './message-bus';
import type {WebsocketStatistics} from './recipes';
import type {AnyFunction} from './util/function';
import {createReadonlyTrackingObj} from './util/object';
import type {PromiseRejectCb, PromiseResolveCb} from './util/promise';
import {createTrackingPromise} from './util/promise';
import type {ElementType} from './util/type-checks';
import {isFunction, isInteger, isObject, isString, isSymbol} from './util/type-checks-basic';
import type {WebsocketRequestHandler} from './websocket-request-handler';
import {createRequestHandler} from './websocket-request-handler';
import type {WebsocketResponseHandler} from './websocket-response-handler';
import {createResponseHandler} from './websocket-response-handler';
const MessageBus = createMessageBus('websocket-promisifier');
/**
* When sending requests to the remote client undefined values in JSON-stringified arguments
* that are undefined are represented by this string constant. JSON.stringify would turn
* undefined into null. The string is chosen in the hope that there is no such string as
* (complete) value in the data.
* Also see {@link encodeUnstringifiableValues} and {@link decodeUnstringifiableValues}
* @private
* @type {'$__undefined$'}
*/
const UNDEFINED_STR = '$__undefined$';
/**
* ### Utility function for JSON.stringify
*
* Used to encode special values for the JSON string that we sent over a websocket connection.
*
* The encode/decode utility functions used with JSON.stringify and JSON.parse, respectively,
* only encode (and decode) special values that we actually need. We definitely need to
* support `undefined`, because when calling a remote service - a remote function - that is a
* plausible value for a parameter for a function. The arguments are items in an array, and
* JSON.stringify changes `undefined` to `null` in an array context. This would have undesirable
* consequences, since an `undefined` function parameter often is incompatible with setting
* the same parameter to `null`.
*
* Functions as parameters: While obviously a common thing in JS code, sending a function to the
* remote site for execution is not supported.
*
* Symbols: Unsupported, since it is hard to see the value of sending a symbol across the net.
*
* Special numeric values: For now we simply forbid sending NaN or Infinity since it's hard to
* see a use case.
*
* However, the function does not silently swallow unsupported values but throws an error when
* it encounters one.
* @private
* @param {string} key - Ignored/unused
* @param {*} value - Any value
* @returns {*} Returns the input value <i>except</i> for when the input value is `undefined`,
* in which case a replacement string is returned
* @throws {Error} Throws an `Error` if a value is of an unsupported type
*/
function encodeUnstringifiableValues<T extends unknown>(
key: string,
value: T
): T | typeof UNDEFINED_STR {
// Support "undefined" so that optional parameters using variables with a possible value of
// "undefined" can be used as arguments when sending requests
if (value === undefined) {
return UNDEFINED_STR;
}
if (
isSymbol(value) ||
isFunction(value) ||
value === Infinity ||
value === -Infinity ||
Number.isNaN(value as any)
) {
throw createError('WSP-ESV', {
key,
value: isFunction(value) ? '[function]' : String(value),
typeofValue: typeof value
});
}
return value;
}
/**
* ### Utility function for JSON.parse
*
* Used to decode special values for the JSON string that we received over a websocket connection.
*
* This function decodes values found while parsing JSON encoded on another machine. Also see
* {@link encodeUnstringifiableValues}.
* @private
* @param {string} _key - Ignored/unused
* @param {*} value
* @returns {*} Returns the decoded value or if no decoding was necessary the value itself
*/
function decodeUnstringifiableValues<T extends unknown>(_key: string, value: T): undefined | T {
if (value === UNDEFINED_STR) {
return undefined;
}
return value;
}
const ResponseTypes: Set<MsgResponseTypes['type']> = new Set([
'data',
'stream',
'stream-end',
'stream-error',
'error'
] as const);
/**
* @private
* @param {*} thing
* @returns {boolean}
*/
function isMsgResponse(thing: unknown): thing is MsgResponseTypes {
return isObject(thing) && ResponseTypes.has(thing.type) && isInteger(thing.responseId);
}
// const RequestTypes: Set<MsgRequestTypes['type']> = new Set([
// 'request',
// 'write-stream-error'
// ] as const);
/**
* @private
* @param {*} thing
* @returns {boolean}
*/
function isMsgRequest(thing: unknown): thing is MsgRequestTypes {
return (
isObject(thing) &&
((thing.type === 'request' && isInteger(thing.requestId) && isInteger(thing.serviceId)) ||
thing.type === 'write-stream-error')
);
}
/**
* Two types of messages: binary or JSON. The former always are chunks of files streamed to us
* in response to a request, which is why we can relay such response types to the
* websocket-response-handler module right away. JSON messages however can be
* - Requests to us
* - Responses to our requests
* - Client-errors encountered when saving stream chunks it requested (the only kind of client
* errors we need to learn about, to stop the stream and not waste bandwidth for data the
* client cannot handle anymore)
* - Messages from the communication server
* @private
* @param {WebsocketMessageResponseHandler} requestHandler
* @param {WebsocketMessageResponseHandler} responseHandler
* @param {WebsocketStatistics} statistics
* @returns {WebsocketMessageResponseHandler} Returns a {@link JsonMessageHandlerFn} function
*/
function createJsonMessageHandler(
requestHandler: WebsocketRequestHandler['requestMsgHandler'],
responseHandler: WebsocketResponseHandler['jsonResponseMsgHandler'],
statistics: WebsocketStatistics
): (json: string) => void {
return (json: string): void => {
// In the error cases below there is no way to find which request may have lead to the
// response, so we can only make a note, but we cannot find a specific request whose promise
// we could reject as failed. Such an error always is an error of the connection as a whole.
// UNKNOWN NETWORK DATA
let msg;
// POLICY DECISION: Ignore erroneous or unparsable messages
try {
msg = JSON.parse(json, decodeUnstringifiableValues);
} catch (ignored) {
statistics.requestsReceivedInvalid += 1;
return MessageBus.send('alert', 'Received unparsable message: ' + json);
}
if (!isObject(msg)) {
statistics.requestsReceivedInvalid += 1;
return MessageBus.send('alert', 'Received message is invalid:' + json);
}
// Asynchronous - but no use "await"-ing the result because there is nobody there waiting
// for it apart from the communication partner that sent the request (in a different
// environment)
if (isMsgRequest(msg)) {
// Requests to us, and client-side stream error notifications
statistics.requestsReceivedTotal += 1;
requestHandler(msg);
} else if (isMsgResponse(msg)) {
// Responses to requests we sent (incl. to the communication server)
responseHandler(msg);
} else {
statistics.requestsReceivedInvalid += 1;
MessageBus.send('alert', 'Received message is invalid:' + json);
}
};
}
/**
* @static
* @param {EncryptedConnectionInterface} connection
* @returns {WebsocketPromisifierAPI} Returns a {@link WebsocketPromisifierAPI} object
*/
export function createWebsocketPromisifier(
connection: EncryptedConnectionInterface
): WebsocketPromisifierAPI {
// These are the services we make available to remote ONE instances.
const services: Map<number, AnyFunction> = new Map();
// For progress reports to the code using the websocket (reported once per second). The
// number of sent bytes is obtained by intercepting calls to connection.send(), the number of
// received bytes is obtained in connection.onMessage.
const statistics: WebsocketStatistics = {
requestsSentTotal: 0,
// Counted in createJsonMessageHandler (it's always JSON, and onMessage could get things
// other than requests)
requestsReceivedTotal: 0,
requestsReceivedInvalid: 0
};
function sendObj(data: MsgRequestTypes | MsgResponseTypes): void {
const str = JSON.stringify(data, encodeUnstringifiableValues);
return connection.send(str);
}
function sendBuf(data: ArrayBuffer): void {
return connection.send(new Uint8Array(data));
}
// 1. For internal use: To find a service
function getService(id: number): void | AnyFunction {
return services.get(id);
}
// API FUNCTIONS
// 2. Exported for external use: To control services
function addService(id: number, fn: AnyFunction): void {
services.set(id, fn);
}
function removeService(id: number): void {
services.delete(id);
}
function clearServices(): void {
services.clear();
}
// The key, a number, is a numeric request ID. The value is the request's service ID number
// and a pair of functions, the send-request's promise's resolve() and reject() functions,
// respectively. The promise is created when a request is sent. It can be resolved when a
// message with the same request ID is received (but the property will be called
// "responseId" to show it is a response to one of our requests). It can be rejected if
// communication ends, either because of an error or because it is shut down, so that we
// don't leave promises hanging.
// The target service ID is included for debugging and error messages, to be able to tell
// what kind of request was sent.
const requests: Map<number, RequestData> = new Map();
function send(serviceId: number, ...args: readonly unknown[]): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!isInteger(serviceId) || serviceId <= 0) {
return reject(
createError('WSP-SN3', {
serviceId,
typeOfType: typeof serviceId
})
);
}
const requestId = statistics.requestsSentTotal;
// These messages will be passed on to the actual communication partner even
// though we send it to the communication server
sendObj({
requestId,
type: 'request',
serviceId,
args
} as MsgRequestObject);
statistics.requestsSentTotal += 1;
// The promise is resolved when the onmessage handler receives a message (a
// response) with this ID. It is rejected if there is a communication error,
// which would prevent the promise from ever being resolved.
// NOTE: In response messages this request ID will be in property "responseId"
requests.set(requestId, [serviceId, resolve, reject]);
});
}
function close(reason?: string): void {
connection.close(reason);
}
// CONNECTION EVENT HANDLING
function onMsgHandler(data: Uint8Array | string): void {
try {
if (data instanceof Uint8Array) {
// These messages always are responses to our requests for files - if
// they are sent from a Buffer (we use base64 strings for binary file
// transfer due to limitations of the React Native platform)
binaryResponseMsgHandler(
data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength)
);
} else if (isString(data)) {
// These messages can be responses to requests we sent, or requests sent
// to us, or parts of base64-encoded file streams that we receive
handleJsonMsg(data);
}
} catch (err) {
MessageBus.send('error', err);
// No details are sent across the wire
connection.close('Internal Error');
}
}
// The promise created here is for the entire websocket connection.
const wsTracker = createTrackingPromise<WebsocketStatistics>();
function onConnectionStateChange(
state: ElementType<
Parameters<
ElementType<
Parameters<EncryptedConnectionInterface['state']['onEnterState']['listen']>
>
>
>
): void {
if (state !== 'closed') {
return;
}
// This is a purely internal cleanup canceling any open read and write streams
// and is independent of rejection of all open requests above
cancelReadStreams();
cancelWriteStreams();
if (requests.size > 0) {
const error = createError('WSP-ONCL0');
const serviceIds: number[] = [];
// This always is a rejected promise regardless of websocket close code since we
// don't have the result that was requested
requests.forEach(([serviceId, _resolveFn, rejectFn], _requestId) => {
serviceIds.push(serviceId);
rejectFn(error);
});
wsTracker.reject(
createError('WSP-ONCL', {
nr: requests.size,
serviceIds
})
);
requests.clear();
return;
}
wsTracker.resolve(statistics);
}
const sunsubscribeOnEnterState = connection.state.onEnterState.listen(onConnectionStateChange);
const unsubscribeOnMessage = connection.onMessage.listen(onMsgHandler);
wsTracker.promise.finally(() => {
sunsubscribeOnEnterState();
unsubscribeOnMessage();
});
// Create response handler functions for messages we receive in response to our
// requests to the other side:
const {
// Responses to requests for files (chunk parts of binary or BASE64 streams)
binaryResponseMsgHandler,
// JSON responses (everything that is not a binary message)
jsonResponseMsgHandler,
// Helper: Cancel all open write file streams for unfinished requests for files
cancelWriteStreams
} = createResponseHandler(requests, sendObj);
const {requestMsgHandler, cancelReadStreams} = createRequestHandler(
getService,
connection,
sendObj,
sendBuf
);
// The message handler function creation has to happen outside the "onmessage" event
// handler using them. We don't want to create a new handler function each time an
// event occurs.
const handleJsonMsg = createJsonMessageHandler(
requestMsgHandler,
jsonResponseMsgHandler,
statistics
);
// The API-object is used to resolve the current promise of connectAndRegisterServices()
// with - as soon as the communication server sends a response to our request #0 sent by
// the WebSocket "onopen" event handler below.
return {
promise: wsTracker.promise,
connId: connection.id,
addService,
removeService,
clearServices,
send,
close,
stats: createReadonlyTrackingObj<WebsocketStatistics>(statistics)
};
}