Source: chum-sync.ts

/**
 * @author Michael Hasenstein <hasenstein@yahoo.com>
 * @copyright REFINIO GmbH & Maximilian Wisgickl 2018
 * @license CC-BY-NC-SA-2.5; portions MIT License
 * @version 0.0.1
 */

/**
 * This module connects two ONE instances and synchronizes (transfers) all objects accessible
 * by one instance to the other. The exchange is bidirectional. When the synchronization is
 * finished a {@link Chum|Chum object} is written by both sides with a log of the exchange.
 *
 * ## Limitations
 *
 * Changes to group memberships performed while a Chum is active are ignored. The Chum uses the
 * group memberships it finds when the exchange is initialized.
 *
 * Writes to storage while a synchronization is running may or may not influence it depending on
 * mode and on timing. For example, in mode "keepRunning = false" (see below) the list of all
 * {@link Access|Access objects} accessible by the given person (by ID hash) are calculated
 * at the start. If an Access object is for an ID hash, i.e. for all versions of a versioned
 * object, any new versions written while the synchronization is running *may* be
 * transferred (or not) depending on if the object was written before that particular object
 * was transferred or after. In mode "keepRunning = true" it will be transferred for certain if
 * the write-operation occurs before the synchronization is explicitly ended.
 *
 * ## keepRunning = false
 *
 * The Chum synchronizes all currently accessible objects and then exits.
 *
 * Note that there is no locking, the behavior if the application continues to write to storage
 * depends on what it writes and on timing, if it modifies access settings that would be
 * relevant to the currently connected remote ONE instance. We do not wish to disable (or queue)
 * all write requests while a synchronization is running, since this may take quite a long time.
 * We also don't want to incur the cost of checking if any writes are relevant. We think it is
 * easy (easier, cheaper) for the application to be programmed in such a way that continued
 * writing to ONE storage while an exchange is running has no undesirable consequences.
 *
 * ## keepRunning = true
 *
 * The Chum performs a full synchronization and then keeps the connection open and keeps
 * synchronizing any objects as they become available _and accessible_. Only when the
 * synchronization is explicitly ended are the final Chum log objects written by both sides. It
 * will not lose any relevant writes because immediately after the initial calculation of
 * accessible Access objects (which point to the accessible objects) the code starts watching
 * what ONE objects are written and, if relevant, adds them to the collection of accessible
 * hashes unknown with those found through the originally started full synchronization.
 *
 * While there is no sharp distinction between objects (hashes) found through the request for a
 * full synchronization and those found while watching new objects being written there is a "full
 * synchronization achieved" status. It is achieved when the objects reachable through all
 * {@link Access} objects found initially (not through monitoring ongoing storage writes) have
 * all been processed (hashes collected, sent to the remote instance, processed and then
 * acknowledged by the remote instance). It is significant because there is a timestamp given to
 * the remote instance when it makes its request. "Full synchronization" means that all objects
 * accessible at that point have been processed. The timestamp can then be used by the remote
 * instance to only ask for objects created after that timestamp.
 *
 * @module
 */

/**
 * @typedef {object} ChumSyncOptions
 * @property {SHA256IdHash} localPersonId - The ID hash of the owner of the local instance the
 * chum is exchanging data with. This does not have to be the main instance but some anonymous
 * ID, so we cannot simply query the instance for the main instance's Person ID
 * @property {SHA256IdHash} remotePersonId - The ID hash of the owner of the remove instance the
 * chum is exchanging data with
 * @property {string} chumName - Name of the Chum
 * @property {string} localInstanceName - Name of local instance
 * @property {string} remoteInstanceName - Name of remote instance
 * @property {boolean} [keepRunning=false] - The connection remains until stopped explicitly
 * @property {number} maxNotificationDelay - When `keepRunning` is true and the full
 * synchronization is over, what should be the maximum delay between new accessible objects
 * being created and notification events sent to us? The remote instance is going to collect all
 * new accessible hashes and only sends them after waiting for this many milliseconds. If there
 * are a lot of new hashes, exceeding a hard-coded threshold, it sends them right away though.
 * If you require fast notifications about rare new-accessible-object events you can set the
 * delay low (*1 ms is the minimum*), if it is not urgent you can set the delay higher and let
 * the remote exporter accumulate more hashes. The delay does not have to be higher than a few
 * seconds, because it will make less and less of a difference very quickly with increasing values.
 */
export interface ChumSyncOptions {
    connection: WebsocketPromisifierAPI;
    localPersonId: SHA256IdHash<Person>;
    remotePersonId: SHA256IdHash<Person>;
    chumName: string;
    localInstanceName: string;
    remoteInstanceName: string;
    keepRunning?: boolean;
    maxNotificationDelay?: number;
}

export interface ChumApi {
    promise: Promise<VersionedObjectResult<Chum>>;
    onAccessibleHash: OneEventSource<AccessibleObjectsTuple>;
    onBlobSaved: OneEventSource<FileCreation<BLOB>>;
    onClobSaved: OneEventSource<FileCreation<CLOB>>;
    onFullSync: OneEventSource<void>;
    onIdObjectSaved: OneEventSource<IdFileCreation>;
    onActivity: OneEventSource<string>;
    onObjectSaved: OneEventSource<AnyObjectCreation>;
    // close: () => Promise<Chum>;
}

import type {AccessibleObjectsTuple} from './chum-base';
import {MESSAGE_TYPES} from './chum-base';
import {createChumExporter} from './chum-exporter';
import {createChumImporter} from './chum-importer';
import {createError} from './errors';
import {createMessageBus} from './message-bus';
import type {BLOB, Chum, CLOB, HashTypes, Person} from './recipes';
import type {AnyObjectCreation, FileCreation} from './storage-base-common';
import type {IdFileCreation, VersionedObjectResult} from './storage-versioned-objects';
import {getObjectByIdObj, storeVersionedObject} from './storage-versioned-objects';
import type {AnyObject} from './util/object';
import type {OneEventSource} from './util/one-event-source';
import {createEventSource} from './util/one-event-source';
import {retry} from './util/promise';
import type {SHA256Hash, SHA256IdHash} from './util/type-checks';
import type {WebsocketPromisifierAPI} from './websocket-promisifier';
import {UNKNOWN_SERVICE_MSG_START} from './websocket-request-handler';

const MessageBus = createMessageBus('chum-sync');

/**
 * Exchanged at the beginning of the Chum protocol. If versions of im- and exporter of both
 * nodes donÄt match the chum exchange will be aborted with an error.
 * @type {number}
 */
export const PROTOCOL_VERSION = 4;

// FOR TESTS ONLY
// export function setProtocolVersion(v: number): void {
//     PROTOCOL_VERSION = v;
// }

/**
 * Creates a function that lets importer and exporter and their submodules log received and sent
 * ONE objects directly to the prepared Chum object. There only are regular references since ID
 * references in objects cause the transfer of the actual object versions. The transfers are
 * added as hash links in order to get reverse maps written from the linked objects back to
 * the Chum object.
 * @private
 * @param {(SHA256Hash[]|SHA256IdHash[])} hashes
 * @returns {function(SHA256Hash):void}
 */
function createLogSentHashFn<T extends SHA256Hash<HashTypes> | SHA256IdHash>(
    hashes: T[]
): (hash: T) => void {
    return (hash: T): void => {
        hashes.push(hash);
    };
}

/**
 * @private
 * @param {Chum} chumObj
 * @returns {number} Returns total number of transfers logged in the Chum object
 */
function nrOfTotalLoggedTransfers(chumObj: Readonly<Chum>): number {
    return [
        'AtoBObjects',
        'AtoBIdObjects',
        'AtoBBlob',
        'AtoBClob',
        'BtoAObjects',
        'BtoAIdObjects',
        'BtoABlob',
        'BtoAClob'
    ].reduce(
        (sum, prop) => sum + (chumObj as AnyObject)[prop].length,
        0 // Initial value
    );
}

async function checkProtocolVersions(connection: WebsocketPromisifierAPI): Promise<void> {
    connection.addService(MESSAGE_TYPES.VERSION_CHECK, (): number => PROTOCOL_VERSION);

    // This requires the other side's chum-sync to be started at around the same time, the
    // retry() is to work around any differences in timing and is a kind of polling mechanism.
    const remoteVersion = await retry(() => connection.send(MESSAGE_TYPES.VERSION_CHECK), {
        delay: 300,
        retries: 15,
        shouldRetry: (err: Error) => err.message.startsWith(UNKNOWN_SERVICE_MSG_START)
    });

    if (remoteVersion !== PROTOCOL_VERSION) {
        MessageBus.send(
            'alert',
            `[${connection.connId}] CHUM PROTOCOL MISMATCH, local: ${PROTOCOL_VERSION}, remote: ${remoteVersion}`
        );

        throw createError('CS-MISMATCH', {local: PROTOCOL_VERSION, remote: remoteVersion});
    }
}

/**
 * @private
 * @param {ChumSyncOptions} options - An object with options
 * @param {ChumApi} _events - The Chum API object, passed through for the OneEventSource objects, to
 * dispatch the events from the proper places in the various Chum modules
 * @returns {Promise<VersionedObjectResult<Chum>>}
 */
async function init(
    {
        connection,
        localPersonId,
        remotePersonId,
        chumName,
        localInstanceName,
        remoteInstanceName,
        keepRunning = false,
        maxNotificationDelay
    }: ChumSyncOptions,
    _events: Omit<ChumApi, 'promise'>
): Promise<VersionedObjectResult<Chum>> {
    MessageBus.send(
        'log',
        `[${connection.connId}] Chum, local: ${localInstanceName}, remote: ${remoteInstanceName}`
    );

    if (localPersonId === undefined || remotePersonId === undefined) {
        throw createError('CS-INIT1', {localPersonId, remotePersonId});
    }

    connection.promise
        .then(() => MessageBus.send('log', `[${connection.connId}] CONECTION CLOSED`))
        .catch(err => MessageBus.send('alert', `[${connection.connId}] CONECTION CLOSED ERR`, err));

    await checkProtocolVersions(connection);

    const chumObj: Chum = {
        $type$: 'Chum',
        name: chumName,
        instance: [localInstanceName, remoteInstanceName],
        person: [remotePersonId, localPersonId],
        highestRemoteTimestamp: 0,
        // The importer and exporter are configured below to add their transfers directly to
        // these arrays
        AtoBObjects: [],
        AtoBIdObjects: [],
        AtoBBlob: [],
        AtoBClob: [],
        BtoAObjects: [],
        BtoAIdObjects: [],
        BtoABlob: [],
        BtoAClob: [],
        BtoAExists: 0,
        unacknowledged: undefined,
        statistics: undefined,
        errors: []
    };

    try {
        const {obj: prevChumObj} = await getObjectByIdObj(chumObj);

        // In case we don't get a synchronization and therefore no new highest timestamp
        chumObj.highestRemoteTimestamp = prevChumObj.highestRemoteTimestamp;
    } catch (err) {
        if (err.name !== 'FileNotFoundError') {
            throw err;
        }
    }

    // FOR FINDING CODE ISSUES: When the logError function is called and the Chum was already
    // written we lose the error.
    let chumEnded = false;

    // If any stored data was already exchanged letting the entire Chum object creation fail is
    // counterproductive since we would lose the log of the exchange entirely. That is why
    // errors are logged and the Chum object creation is allowed to go ahead in that case.
    function logError(err: Error): void {
        if (chumEnded) {
            // _Somebody_ should notice this asynchronously thrown uncaught error. This can only
            // mean there is a code logic error.
            return MessageBus.send(
                'error',
                `[${connection.connId}] Received Error after Chum ended`,
                err
            );
        }

        chumObj.errors.push(err);
    }

    // NO WAITING, just promise creation. This makes the authentication service available.
    // THIS NEEDS TO HAPPEN BEFORE WE CONNECT to guarantee that the authentication service is
    // available already without even the slightest delay.
    const exporterPromise = createChumExporter({
        WebsocketObj: connection,
        remotePersonId,
        logSentObject: createLogSentHashFn(chumObj.AtoBObjects),
        logSentIdObject: createLogSentHashFn(chumObj.AtoBIdObjects),
        logSentBlob: createLogSentHashFn(chumObj.AtoBBlob),
        logSentClob: createLogSentHashFn(chumObj.AtoBClob),
        logError,
        localKeepRunningSetting: keepRunning
    });

    const importerPromise = createChumImporter({
        WebsocketObj: connection,
        logSavedObject: createLogSentHashFn(chumObj.BtoAObjects),
        logSavedIdObject: createLogSentHashFn(chumObj.BtoAIdObjects),
        logSavedBlob: createLogSentHashFn(chumObj.BtoABlob),
        logSavedClob: createLogSentHashFn(chumObj.BtoAClob),
        logExists: (count: number) => {
            chumObj.BtoAExists += count;
        },
        logError,
        highestRemoteTimestamp: chumObj.highestRemoteTimestamp,
        keepRunning,
        maxNotificationDelay
    });

    // LONG WAIT: This is where we wait for the exchange of files to be over; end conditions are:
    // 1. Loss of connection ends both importer and exporter (error state does not matter)
    // 2. A full synchronization of both importer and exporter means there is nothing left to do
    //    if keepRunning === false
    // 3. Any error independently ends (only) importer or exporter (incl. errors in detached
    //    functions)
    try {
        MessageBus.send('log', `[${connection.connId}] AWAIT IMPORTER AND EXPORTER PROMISE`);

        const [exporterStats, highestRemoteTimestamp] = await Promise.all([
            exporterPromise,
            importerPromise
        ]);

        chumObj.highestRemoteTimestamp = highestRemoteTimestamp;
        chumObj.unacknowledged = exporterStats.sentWaitingForImporterAck;
    } catch (err) {
        // POLICY DECISION
        // If nothing was transferred, yet we don't bother writing a Chum object. As soon as
        // something was transferred we have to log it though, i.e. we write a Chum object and
        // log the error inside instead of throwing the error. After all, _some_ exchange took
        // place, that is not a total error.
        if (nrOfTotalLoggedTransfers(chumObj) === 0) {
            throw err;
        }
        // No logging of errors in the Chum object - importer and exporter already did that.
        // Logging here would only log the first error coming from either the importer or the
        // exporter but not both, if they both fail.
    } finally {
        MessageBus.send('log', `[${connection.connId}] AWAIT IMPORTER AND EXPORTER PROMISE - DONE`);

        // If it is not closed yet, close it. We expect to need this for the case when
        // keepRunning is false, and we complete a full synchronization on both sides, i.e. both
        // exporter and importer are done (their promises resolved but the websocket connection
        // still is there). This function is synchronous, its possibly asynchronous effect is
        // reflected by the websocket connection's tracking promise.
        connection.close('Chum ended');

        try {
            chumObj.statistics = await connection.promise;
        } catch (err) {
            // Websocket errors are websocket close codes other than CLOSE_CODES.NORMAL and
            // CLOSE_CODES.PARTNER_DISCONNECT (defined in websocket-promisifier.js). Both importer
            // and exporter ignore websocket error state, they only care about that the connection
            // was closed. That means we have to treat errors here (by logging them in the Chum
            // object). There is no action, we don't really care much why a connection was closed,
            // the synchronization attempts to do what it can and when it's over. We assume the
            // connection _always_ is unreliable to begin with.
            // Both importer and exporter ignore websocket error state, they only care about
            // that the connection was closed. That means we have to treat errors here (by
            // logging them in the Chum object). There is no action, we don't really care much
            // why a connection was closed, the synchronization attempts to do what it can and
            // when it's over, it's over. We assume the connection _always_ is unreliable to
            // begin with.
            chumObj.errors.push(err);
        }

        // For callbacks called after this point by still running asynchronous code
        chumEnded = true;
    }

    MessageBus.send(
        'debug',
        `[${connection.connId}] FINAL new Chum object: ${JSON.stringify(
            chumObj,
            (key, value) =>
                ['AtoBObjects', 'BtoAObjects', 'AtoBOther', 'BtoAOther'].includes(key)
                    ? `[${value.length} hashes]`
                    : value,
            4
        )}`
    );

    const storedChumResult = await storeVersionedObject(chumObj);

    if (chumObj.errors.length > 0) {
        MessageBus.send(
            'error',
            `[${connection.connId}] END of chum-sync, ERRORS: ` +
                `Chum object HASH ${storedChumResult.hash} ID-HASH ${storedChumResult.idHash}`
        );
        chumObj.errors.forEach(err =>
            MessageBus.send('error', `[${connection.connId}] Chum Error:`, err)
        );
    }

    MessageBus.send(
        'log',
        `[${connection.connId}] END of chum-sync, ` +
            `Chum object HASH ${storedChumResult.hash} ID-HASH ${storedChumResult.idHash}`
    );

    return storedChumResult;
}

/**
 * This function executes a Chum synchronization between two ONE instances and when done stores
 * a new Chum object with a log of what was exchanged.
 * @static
 * @async
 * @param {ChumSyncOptions} options
 * @returns {Promise<ChumApi>} Resolves with the result of storing the new Chum
 * object.
 */
export function createChum(options: ChumSyncOptions): ChumApi {
    MessageBus.send('log', `CHUM OPTIONS: ${JSON.stringify(options)}`);

    const events = {
        onAccessibleHash: createEventSource<AccessibleObjectsTuple>(),
        onBlobSaved: createEventSource<FileCreation<BLOB>>(),
        onClobSaved: createEventSource<FileCreation<CLOB>>(),
        onFullSync: createEventSource<void>(),
        onIdObjectSaved: createEventSource<IdFileCreation>(),
        onActivity: createEventSource<string>(),
        onObjectSaved: createEventSource<AnyObjectCreation>()
    };

    return {
        promise: init(options, events),
        ...events
    };
}