Source: chum-exporter.ts

/**
 * @author Michael Hasenstein <hasenstein@yahoo.com>
 * @copyright REFINIO GmbH 2018
 * @license CC-BY-NC-SA-2.5; portions MIT License
 * @version 0.0.1
 */

/**
 * @private
 * @module
 */

/**
 * The {@link chum-exporter.module:ts.create|Chum exporter module's create function} accepts a
 * single options object parameter of this type
 * @private
 * @typedef {object} ChumExporterOptions
 * @property {WebsocketPromisifierAPI} WebsocketObj - Websocket-promisifier API-object
 * @property {function((SHA256Hash)):void} logSentObject - Log ONE object transfers
 * @property {function((SHA256IdHash)):void} logSentIdObject - Log ONE ID object transfers
 * @property {function(SHA256Hash):void} logSentBlob - Log BLOB transfers
 * @property {function(SHA256Hash):void} logSentClob - Log CLOB transfers
 * @property {function(Error):void} logError - Log Errors
 * @property {boolean} localKeepRunningSetting - Keep updating even after a full sync. is done
 * @property {SHA256IdHash} remotePersonId - The ID hash of the owner of the remove instance the
 * chum is exchanging data with
 */
export interface ChumExporterOptions {
    WebsocketObj: WebsocketPromisifierAPI;
    logSentObject: (hash: SHA256Hash) => void;
    logSentIdObject: (hash: SHA256IdHash) => void;
    logSentBlob: (hash: SHA256Hash<BLOB>) => void;
    logSentClob: (hash: SHA256Hash<CLOB>) => void;
    logError: (error: Error) => void;
    localKeepRunningSetting: boolean;
    remotePersonId: SHA256IdHash<Person>;
}

/**
 * @private
 * @typedef {object} ExporterStats
 * @property {Set<SHA256Hash|SHA256IdHash>} sentWaitingForImporterAck - Collect hashes for files we
 * sent and when we get the importer's acknowledgement that they were successfully stored on the
 * remote instance remove them. This Set only contains hashes of objects actually requested by
 * and sent to the remote instance.
 * @property {number} nrOfAccessible - The total number of unique and concrete (i.e. non ID)
 * object hashes found while gathering the hashes of all objects accessible to the remote instance
 * @property {number} nrOfAcknowledged - The total number of hashes acknowledged as "processed" by
 * the remote instance. This should be the same as `nrOfAccessible` at the end of the exchange.
 */
export interface ExporterStats {
    sentWaitingForImporterAck: Set<SHA256Hash<HashTypes> | SHA256IdHash>;
    nrOfAccessible: number;
    nrOfAcknowledged: number;
}

import {MESSAGE_TYPES} from './chum-base';
import {createFindAccessibleObjects} from './chum-exporter-find-objects';
import {
    createAcknowledgeFn,
    createRunFindAccessibleObjectsFn,
    createSendBlobFn,
    createSendCrdtMetaObjectFn,
    createSendIdObjectFn,
    createSendObjectFn
} from './chum-exporter-service-functions';
import {createError} from './errors';
import {createMessageBus} from './message-bus';
import type {BLOB, CLOB, HashTypes, Person} from './recipes';
import type {AnyFunction} from './util/function';
import {createNeverFailAsyncErrorWrapper, createRethrowingAsyncErrorWrapper} from './util/function';
import {createTrackingPromise} from './util/promise';
import type {SHA256Hash, SHA256IdHash} from './util/type-checks';
import type {WebsocketPromisifierAPI} from './websocket-promisifier';

const MessageBus = createMessageBus('chum-exporter');

/**
 * Function that creates a Chum exporter instance.
 *
 * ## End conditions
 *
 * 1. Authentication failure
 * 2. Loss of connection (connection error state does not matter)
 * 3. A full synchronization means there is nothing left to do - if keepRunning === false
 * 4. Any error ends the exporter (incl. errors in detached functions)
 *
 * The exporter instance also needs to be aware of some decoupled functions:
 *
 * 1. **Service functions** called by websocket-promisifier. Their caller has only very limited
 *    error handling: All errors are caught and the remote instance only receives a generic
 *    error message. However, it is this module (chum-exporter) that ultimately controls the
 *    services, even if it is not involved in the actual service function invocation. If there
 *    is an error we probably want to stop the exporter since we are in an undefined state that
 *    should be fixed.
 *
 * 2. **Finding accessible hashes** is a service function run in response to a request from the
 *    remote instance, but it only checks the arguments received over the network and then spawns a
 *    decoupled function which runs independent of and for much longer than the service request
 *    function, and therefore has no parent that errors could bubble up to automatically. This
 *    function consists of two parts:
 *    - Find the directly accessible root-level objects, i.e. the ones where an
 *      {@link Access|Access object} exists that points to it granting access for a given
 *      user and/or group,
 *    - For each root-level object iterate over the graph created by its hash links.
 *
 * 3. **Sending accessible hashes events** to the remote instance's importer is another
 *    decoupled function decoupled from the one described above.
 *
 * @static
 * @async
 * @param {ChumExporterOptions} options
 * @returns {Promise<Set<SHA256Hash>>} Returns a promise that resolves with a Set object
 * containing the hashes of objects we sent but did not receive an acknowledgement for, so we
 * don't know if the recipient was able to successfully add them to their storage
 */
export async function createChumExporter(options: ChumExporterOptions): Promise<ExporterStats> {
    const {
        WebsocketObj,
        logSentObject,
        logSentIdObject,
        logSentBlob,
        logSentClob,
        logError,
        localKeepRunningSetting,
        remotePersonId
    } = options;

    if (remotePersonId === undefined) {
        throw createError('CE-CCE1');
    }

    const exporterStats: ExporterStats = {
        sentWaitingForImporterAck: new Set(),
        nrOfAccessible: 0,
        nrOfAcknowledged: 0
    };

    // Used to wait for the end condition of the exporter. All stop conditions for the importer
    // are realized through this promise.
    const exporterTracker = createTrackingPromise<void>();

    WebsocketObj.promise
        .finally(() => exporterTracker.resolve())
        // WebsocketObj promise failure is handled elsewhere, but we need to handle
        // rejections of the new Promise created by the finally() method
        .catch(ignore => undefined);

    // Track the end condition of the decoupled "find accessible objects" object graph iterator
    // with a promise. It fits better into our async/await based code than callbacks. The
    // function starts another decoupled function for sending "accessible objects" events to the
    // remote instance, its errors are reported here too.
    const findObjectsTracker = createTrackingPromise<void>();

    // End condition #4 - The iterator errors are exporter errors
    findObjectsTracker.promise.catch((err: Error): void => {
        logError(err);
        exporterTracker.reject(err);
    });

    // TWO IN ONE (multipurpose data structure)
    // Declared in this module because it is used by several functions (services). Keys are
    // object hashes, values are boolean, i.e. true or false.
    // 1. The key alone is for the actual core function, to determine if a given hash is
    // accessible (Map's map.has(hash) function is used).
    // 2. The boolean value is set to false initially and indicates whether the remote
    // instance acknowledged processing this hash.
    const accessibleHashes: Map<SHA256Hash<HashTypes> | SHA256IdHash, boolean> = new Map();

    // Returns a boolean indicating whether the hash was added
    function addAccessible(hash: SHA256Hash<HashTypes> | SHA256IdHash): boolean {
        // Duplicates are possible because the object graph iterator only prevents duplicates
        // within each graph, but a new iterator is created for each Access object, and the
        // object graphs they point to, may overlap
        if (accessibleHashes.has(hash)) {
            return false; // Hash was not added
        }

        accessibleHashes.set(hash, false);
        exporterStats.nrOfAccessible += 1;
        return true; // Hash was added
    }

    function getAccessibleHashStatus(hash: SHA256Hash<HashTypes> | SHA256IdHash): void | boolean {
        return accessibleHashes.get(hash);
    }

    function addUnacknowledged(hash: SHA256Hash<HashTypes> | SHA256IdHash): void {
        exporterStats.sentWaitingForImporterAck.add(hash);
    }

    let iteratorDone = false;
    findObjectsTracker.promise
        .finally(() => {
            iteratorDone = true;
        })
        // findObjectsTracker promise failure is handled elsewhere, but we need to handle
        // rejections of the new Promise created by the finally() method
        .catch(ignore => undefined);

    // The chum-exporter-iterator function collecting accessible hashes and sending them to the
    // remote importer wants to know about acknowledgements and the resulting sizes of the two
    // structures for accessible hashes and for unacknowledged (files were actually sent)
    // hashes.
    const onAckSubscribers: Set<(a: number, b: number) => void> = new Set();

    function subscribeToOnAcknowledgeMsg(fn: (a: number, b: number) => void): void {
        onAckSubscribers.add(fn);
    }

    function onAcknowledgeMsg(hashesAcknowledgedByRemote: Array<SHA256Hash<HashTypes>>): void {
        MessageBus.send(
            'log',
            `[${WebsocketObj.connId}] onAcknowledgeMsg: ACKed ${
                hashesAcknowledgedByRemote.length
            } hashes - ${hashesAcknowledgedByRemote.join(', ')}`
        );

        hashesAcknowledgedByRemote.forEach(hash => {
            exporterStats.sentWaitingForImporterAck.delete(hash);
            accessibleHashes.set(hash, true);
            exporterStats.nrOfAcknowledged += 1;
        });

        // Inform subscribers about how many entries there still are
        onAckSubscribers.forEach(fn =>
            fn(
                exporterStats.nrOfAccessible - exporterStats.nrOfAcknowledged,
                exporterStats.sentWaitingForImporterAck.size
            )
        );

        // End condition #3
        // 1. We stop when a full synchronization has been achieved (keepRunning is false)
        // 2. Storage iteration over all accessible root nodes is finished and all accessible
        //    hashes have been reported to the remote instance via events (iteratorDone)
        // 3. All accessible hashes sent to the remote importer (the hashes, not the files)
        //    were acknowledged by the remote instance (whether transferred or not needed)
        if (
            !localKeepRunningSetting &&
            iteratorDone &&
            exporterStats.nrOfAccessible === exporterStats.nrOfAcknowledged
        ) {
            exporterTracker.resolve();
        }
    }

    // End condition #3
    // Callback for all functions not called from here, but which are exporter functions. It is
    // like a human organisational structure where you are responsible for employees but the
    // work they do is for and controlled by somebody else. We give them this function to also
    // report any problems to us. In our context they work for the customer, i.e. the remote
    // instance sending requests to us. We want to remain in control of them overall.
    function onServiceError(err: Error): void {
        // No logError() here because this happens at the end of the create() function which
        // "await"s the promise
        exporterTracker.reject(err);
    }

    // Create an exception-intercepting wrapper function for functions we manage but that
    // are invoked by the remote instance through the websocket-promisifier connection. The
    // actual caller only gets a generic error message, the place where the error must be
    // handled is here, however.
    const serviceFnErrorWrapper = createRethrowingAsyncErrorWrapper(onServiceError);

    function onIteratorError(err: Error): void {
        logError(err);
        // This also rejects the exporterTracker promise set up in the parent create()
        // function. and since the iterator listens on exporterTracker promise it stops itself.
        findObjectsTracker.reject(err);
    }

    // A second error wrapper function that prevents exceptions from being thrown. This is
    // for fully detached functions started by service functions, whose exceptions would
    // result in "Uncaught error" or an "Unhandled promise rejection" Javascript runtime error.
    const iteratorErrorWrapper = createNeverFailAsyncErrorWrapper(onIteratorError);

    // The main part of the MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST event handler
    // runs decoupled from the actual event handler through setTimeout. Since that means the
    // service function loses control
    const detachedFindAccessibleObjFn = iteratorErrorWrapper(
        createFindAccessibleObjects(
            WebsocketObj,
            remotePersonId,
            addAccessible,
            subscribeToOnAcknowledgeMsg,
            exporterTracker,
            findObjectsTracker,
            logError,
            WebsocketObj.connId
        )
    );

    const services: Array<[number, AnyFunction]> = [
        [
            MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST,
            createRunFindAccessibleObjectsFn(
                detachedFindAccessibleObjFn,
                localKeepRunningSetting,
                options.WebsocketObj.connId
            )
        ],
        [
            MESSAGE_TYPES.GET_OBJECT,
            createSendObjectFn(
                logSentObject,
                getAccessibleHashStatus,
                addUnacknowledged,
                options.WebsocketObj.connId
            )
        ],
        [
            MESSAGE_TYPES.GET_ID_OBJECT,
            createSendIdObjectFn(
                logSentIdObject,
                getAccessibleHashStatus,
                addUnacknowledged,
                options.WebsocketObj.connId
            )
        ],
        [
            MESSAGE_TYPES.GET_BLOB,
            createSendBlobFn(
                logSentBlob,
                logSentClob,
                getAccessibleHashStatus,
                addUnacknowledged,
                options.WebsocketObj.connId
            )
        ],
        [
            MESSAGE_TYPES.GET_CRDT_META_OBJECT,
            createSendCrdtMetaObjectFn(
                logSentObject,
                getAccessibleHashStatus,
                addUnacknowledged,
                options.WebsocketObj.connId
            )
        ],
        [
            MESSAGE_TYPES.ACKNOWLEDGE,
            createAcknowledgeFn(onAcknowledgeMsg, options.WebsocketObj.connId)
        ]
    ];

    services.forEach(([id, fn]) => WebsocketObj.addService(id, serviceFnErrorWrapper(fn)));

    // STEP #2
    // This tracking promise lets us wait for one of the exporter's end conditions.
    // End conditions #3 and #4 - Success: full sync. done, Failure: Error
    try {
        MessageBus.send('log', `[${WebsocketObj.connId}] AWAIT EXPORTER PROMISE`);

        await Promise.race([
            // End condition #2 - This also is the main end condition when keepRunning
            // Ignore websocket errors - the parent logs them
            WebsocketObj.promise.catch(ignore => undefined),
            exporterTracker.promise
        ]);
    } finally {
        MessageBus.send('log', `[${WebsocketObj.connId}] AWAIT EXPORTER PROMISE - DONE`);

        [
            MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST,
            MESSAGE_TYPES.GET_OBJECT,
            MESSAGE_TYPES.GET_BLOB,
            MESSAGE_TYPES.GET_CRDT_META_OBJECT,
            MESSAGE_TYPES.ACKNOWLEDGE
        ].forEach((id: number): void => WebsocketObj.removeService(id));
    }

    MessageBus.send(
        'debug',
        `[${WebsocketObj.connId}] ` +
            'END OF EXPORTER, statistics:\n' +
            '  nrOfAccessible: ' +
            exporterStats.nrOfAccessible +
            '\n' +
            '  nrOfAcknowledged: ' +
            exporterStats.nrOfAcknowledged +
            '\n' +
            '  sentWaitingForImporterAck: ' +
            JSON.stringify(Array.from(exporterStats.sentWaitingForImporterAck))
    );

    return exporterStats;
}