Source: chum-exporter-storage-monitor.ts

/**
 * @author Michael Hasenstein <hasenstein@yahoo.com>
 * @copyright REFINIO GmbH 2018
 * @license CC-BY-NC-SA-2.5; portions MIT License
 * @version 0.0.1
 */

/**
 * @private
 * @module
 */

/**
 * See module {@link chum-exporter-storage-monitor.module:ts|chum-exporter-storage-monitor}.
 * @global
 * @typedef {object} StorageMonitorOptions
 * @property {boolean} keepRunning
 * @property {function(Error):void} onError
 * @property {SHA256IdHash} remotePersonIdHash
 * @property {function(SHA256IdHash):boolean} isMonitoredIdHash
 * @property {ProcessAccessObjsDataFn} processAccessObjData
 * @property {HashesCollectorFn} onIteratorObject
 * @property {number} connId
 */
export interface StorageMonitorOptions {
    keepRunning: boolean;
    onError: (err: Error) => void;
    remotePersonIdHash: SHA256IdHash<Person>;
    isMonitoredIdHash: (hash: SHA256IdHash) => boolean;
    processAccessObjData: ProcessAccessObjsDataFn;
    onIteratorObject: HashesCollectorFn;
    connId: number;
}

/**
 * @private
 * @typedef {Function} ProcessNewGroupObjFn
 * @param {VersionedObjectResult}
 * @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
 */
export type ProcessNewObjFn = (newObjResult: Readonly<VersionedObjectResult>) => Promise<void>;

/**
 * @private
 * @typedef {Function} ProcessNewGroupObjFn
 * @param {VersionedObjectResult<Group>}
 * @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
 */
export type ProcessNewGroupObjFn = (
    newObjResult: Readonly<VersionedObjectResult<Group>>
) => Promise<void>;

/**
 * @private
 * @typedef {Function} ProcessNewAccessObjFn
 * @param {VersionedObjectResult<Access | IdAccess>}
 * @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
 */
export type ProcessNewAccessObjFn = (
    newObjResult: Readonly<VersionedObjectResult<Access | IdAccess>>
) => Promise<void>;

import type {
    AccessCacheIteratorFn,
    HashesCollectorFn,
    ProcessAccessObjsDataFn
} from './chum-exporter-find-objects';
import {createMessageBus} from './message-bus';
import {iterateGraphFromObjectBottomUp} from './object-graph-bottom-up-iterator';
import type {Access, Group, IdAccess, Person} from './recipes';
import {getOnlyLatestReferencingObjsHashAndId} from './reverse-map-query';
import type {VersionedObjectResult} from './storage-versioned-objects';
import {onVersionedObj} from './storage-versioned-objects';
import {createNeverFailAsyncErrorWrapper} from './util/function';
import type {SHA256IdHash} from './util/type-checks';

const MessageBus = createMessageBus('chum-exporter-storage-monitor');

/**
 * This function checks if an Access object grants access to a given Person ID object
 * - directly, if the Person ID hash is included (easy)
 * - indirectly, if a Person ID hash is included in a group whose ID hash is included in the
 * Access object.
 * @private
 * @async
 * @param {SHA256IdHash} personIdHash
 * @param {(Access|IdAccess)} accessObj
 * @returns {Promise<boolean>} Returns `true` if the given Access object grants access to the
 * given Person ID, `false` otherwise
 */
async function isAccessObjectForPerson(
    personIdHash: SHA256IdHash<Person>,
    accessObj: Readonly<Access | IdAccess>
): Promise<boolean> {
    if (accessObj.person.some(idRef => idRef === personIdHash)) {
        return true;
    }

    const groupHashes = await getOnlyLatestReferencingObjsHashAndId(personIdHash, 'Group');

    for (const {idHash} of groupHashes) {
        if (accessObj.group.some(idRef => idRef === idHash)) {
            return true;
        }
    }

    return false;
}

/**
 * @private
 * @param {SHA256IdHash} remotePersonIdHash
 * @param {ProcessAccessObjsDataFn} processAccessObjData
 * @param {number} connId
 * @returns {ProcessNewGroupObjFn} Returns a function used to subscribe to version map updates
 */
function createOnNewGroupObjectFn(
    remotePersonIdHash: SHA256IdHash<Person>,
    processAccessObjData: ProcessAccessObjsDataFn,
    connId: number
): ProcessNewGroupObjFn {
    return async function ProcessNewGroupObj(newGroupObjectData) {
        MessageBus.send(
            'log',
            `[${connId}] onNewGroupObjectFn: ${JSON.stringify(newGroupObjectData)}`
        );

        if (!newGroupObjectData.obj.person.some(ref => ref === remotePersonIdHash)) {
            return;
        }

        await processAccessObjData(
            await getOnlyLatestReferencingObjsHashAndId(newGroupObjectData.idHash, 'Access'),
            await getOnlyLatestReferencingObjsHashAndId(newGroupObjectData.idHash, 'IdAccess')
        );
    };
}

/**
 * @private
 * @param {SHA256IdHash} remotePersonIdHash
 * @param {ProcessAccessObjsDataFn} processAccessObjData
 * @param {number} connId
 * @returns {ProcessNewAccessObjFn} Returns a function used to subscribe to version map updates
 */
function createOnNewAccessObjectFn(
    remotePersonIdHash: SHA256IdHash<Person>,
    processAccessObjData: ProcessAccessObjsDataFn,
    connId: number
): ProcessNewAccessObjFn {
    return async function ProcessNewAccessObj(newAccessObjectData) {
        const isForPerson = await isAccessObjectForPerson(
            remotePersonIdHash,
            newAccessObjectData.obj
        );

        if (!isForPerson) {
            return;
        }

        MessageBus.send(
            'log',
            `[${connId}] onNewAccessObjectFn (for logged-in person ID): ${JSON.stringify(
                newAccessObjectData
            )}`
        );

        await processAccessObjData([
            {
                hash: newAccessObjectData.hash,
                idHash: newAccessObjectData.idHash,
                // Type cast: Never undefined for new *versioned* objects
                timestamp: newAccessObjectData.timestamp as number
            }
        ]);
    };
}

/**
 * This function is similar to `createIterateOverAccessibleGraphFn()` in the parent module of the
 * current module `chum.exporter-find-objects()`.
 *
 * When the remote importer has access to an ID object it has access to all future versions. The
 * initial synchronization sends all versions available at the time. This function created here
 * is called whenever a new version of the accessible ID object is created while the ONE
 * instances remain connected. The new object may have links to other objects that by definition
 * also are accessible to the remote instance, so we have to call the object-graph-iterator.
 * Unlike the initial synchronization, where the root object always is an Access (or IdAccess)
 * object here we may get any kind of object. Another difference is that we always start with a
 * concrete object, never with an ID object (IdAccess object).
 * @private
 * @param {HashesCollectorFn} collector - TODO The type is wrong!! The given type is only for the
 * iterator function created in chum-exporter-find-objects
 * @returns {AccessCacheIteratorFn} Returns a function that takes a data object and returns
 * a promise that resolves when the iteration is over. It resolves with `true` when the
 * iteration completed, i.e. all objects in the graph were visited. It resolves with `false` if
 * the iterator was stopped by the callback using a `StopIteratorError`.
 */
function createIterateOverNewObjectVersionFn(collector: HashesCollectorFn): AccessCacheIteratorFn {
    return async function GraphIteratorFunc({hash}) {
        // IMPORTANT - leave nodes first. This means that the remote importer that asks for
        // a ONE object can be sure that it already received at least all the hashes of all
        // dependencies, because the parent will come _after_all its direct and indirect
        // children in the flat array of hashes of accessible objects. This means the remote
        // importer can rely on either having the dependency (just retrieved or it already
        // had it previously), or that there is an active transfer and an unfulfilled
        // promise for the transfer to wait for. This is to ensure that the importer does
        // not write an object until all dependencies have been saved: While we create a
        // flat list and perform all transfers in parallel the partial sequentialization is
        // implemented per received object on the side of the importer.
        return await iterateGraphFromObjectBottomUp(hash, collector, {includeFileSize: false});
    };
}

/**
 * @private
 * @param {ProcessNewGroupObjFn} onNewGroupObjFn
 * @param {ProcessNewAccessObjFn} onNewAccessObjFn
 * @param {AccessCacheIteratorFn} iterateOverAccessibleGraph
 * @param {function(SHA256IdHash):boolean} isMonitoredIdHash
 * @param {number} connId
 * @returns {function(VersionMapUpdateInfo):Promise<boolean>} Returns a function used to
 * subscribe to version map updates
 */
function createOnNewVersionedObjectFn(
    onNewGroupObjFn: ProcessNewGroupObjFn,
    onNewAccessObjFn: ProcessNewAccessObjFn,
    iterateOverAccessibleGraph: AccessCacheIteratorFn,
    isMonitoredIdHash: (hash: SHA256IdHash) => boolean,
    connId: number
): ProcessNewObjFn {
    // Only include objects created after this timestamp, which is the time when the storage
    // monitor is initialized (subtracting 1 millisecond just to be safe, it does not matter if
    // this catches an object that has already been transmitted, the other side will just
    // acknowledge without retrieving anything, but missing something would be bad)
    const createdAfter = Date.now() - 1;

    return async function ProcessNewObj(newObjectData) {
        MessageBus.send('log', `[${connId}] onNewObjectFn: ${JSON.stringify(newObjectData)}`);

        // Regardless of whether this is a Group, Access or new version of an ID-hash linked
        // versioned object of an object graph to which the remote has access, if the object already
        // existed before the storage-monitor was started, *and* if the version map did not change
        // (e.g. making an existing object the current version again, if it was not), then there is
        // no need to do anything.
        // NOTE: An undefined timestamp implies an existing object (no version map update
        // means there will be no timestamp). The version map will *always* be updated, and a
        // timestamp of that action returned, for new objects.
        if (newObjectData.timestamp === undefined || newObjectData.timestamp < createdAfter) {
            return;
        }

        switch (newObjectData.obj.$type$) {
            case 'Group':
                await onNewGroupObjFn(newObjectData as VersionedObjectResult<Group>);
                break;

            case 'Access':
            case 'IdAccess':
                await onNewAccessObjFn(newObjectData as VersionedObjectResult<Access | IdAccess>);
                break;

            default: {
                if (isMonitoredIdHash(newObjectData.idHash)) {
                    await iterateOverAccessibleGraph({
                        hash: newObjectData.hash,
                        idHash: newObjectData.idHash,
                        timestamp: newObjectData.timestamp
                    });
                }
                break;
            }
        }
    };
}

/**
 * @static
 * @function
 * @param {StorageMonitorOptions} options
 * @param {boolean} options.keepRunning
 * @param {function(Error):void} options.onError
 * @param {SHA256IdHash} options.remotePersonIdHash
 * @param {function(SHA256IdHash):boolean} options.isMonitoredIdHash
 * @param {ProcessAccessObjsDataFn} options.processAccessObjData
 * @param {HashesCollectorFn} options.onIteratorObject
 * @returns {function():void} Returns a function that stops the storage monitoring
 */
export function initStorageMonitor({
    keepRunning,
    onError,
    remotePersonIdHash,
    isMonitoredIdHash,
    processAccessObjData,
    onIteratorObject,
    connId
}: StorageMonitorOptions): () => void {
    if (!keepRunning) {
        // Nothing to do => nothing to undo
        return () => undefined;
    }

    const errorWrapper = createNeverFailAsyncErrorWrapper(onError);
    const onNewGroupObject = createOnNewGroupObjectFn(
        remotePersonIdHash,
        processAccessObjData,
        connId
    );
    const onNewAccessObject = createOnNewAccessObjectFn(
        remotePersonIdHash,
        processAccessObjData,
        connId
    );
    const iterateOverAccessibleGraph = createIterateOverNewObjectVersionFn(onIteratorObject);

    // While we are connected any new objects created by this instance that are accessible to
    // the remote instance trigger a notification sent to the other instance, which can then
    // decide to download the new object(s).

    // NOTE that while the step #2 object graph iterators for the originally found Access
    // objects are still running we may already receive updates. They will just be added to the
    // hash collection mixed with those coming from the iterator(s).
    // These functions must not throw an error or reject their promise since that would end up
    // in the wrong context and not in this one right here, hence the wrapper.
    const onNewObjectCb = errorWrapper(
        createOnNewVersionedObjectFn(
            onNewGroupObject,
            onNewAccessObject,
            iterateOverAccessibleGraph,
            isMonitoredIdHash,
            connId
        )
    );

    // Version map updates of ID hashes of ID object links in any accessible object, each
    // new entry is a new "accessible object" which needs to be sent to the remote instance
    return onVersionedObj.addListener(onNewObjectCb);
}