/**
* @author Michael Hasenstein <hasenstein@yahoo.com>
* @copyright REFINIO GmbH 2018
* @license CC-BY-NC-SA-2.5; portions MIT License
* @version 0.0.1
*/
/**
* @private
* @module
*/
/**
* See module {@link chum-exporter-storage-monitor.module:ts|chum-exporter-storage-monitor}.
* @global
* @typedef {object} StorageMonitorOptions
* @property {boolean} keepRunning
* @property {function(Error):void} onError
* @property {SHA256IdHash} remotePersonIdHash
* @property {function(SHA256IdHash):boolean} isMonitoredIdHash
* @property {ProcessAccessObjsDataFn} processAccessObjData
* @property {HashesCollectorFn} onIteratorObject
* @property {number} connId
*/
export interface StorageMonitorOptions {
keepRunning: boolean;
onError: (err: Error) => void;
remotePersonIdHash: SHA256IdHash<Person>;
isMonitoredIdHash: (hash: SHA256IdHash) => boolean;
processAccessObjData: ProcessAccessObjsDataFn;
onIteratorObject: HashesCollectorFn;
connId: number;
}
/**
* @private
* @typedef {Function} ProcessNewGroupObjFn
* @param {VersionedObjectResult}
* @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
*/
export type ProcessNewObjFn = (newObjResult: Readonly<VersionedObjectResult>) => Promise<void>;
/**
* @private
* @typedef {Function} ProcessNewGroupObjFn
* @param {VersionedObjectResult<Group>}
* @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
*/
export type ProcessNewGroupObjFn = (
newObjResult: Readonly<VersionedObjectResult<Group>>
) => Promise<void>;
/**
* @private
* @typedef {Function} ProcessNewAccessObjFn
* @param {VersionedObjectResult<Access | IdAccess>}
* @returns {Promise<undefined>} Returns a promise that resolves with `undefined`
*/
export type ProcessNewAccessObjFn = (
newObjResult: Readonly<VersionedObjectResult<Access | IdAccess>>
) => Promise<void>;
import type {
AccessCacheIteratorFn,
HashesCollectorFn,
ProcessAccessObjsDataFn
} from './chum-exporter-find-objects';
import {createMessageBus} from './message-bus';
import {iterateGraphFromObjectBottomUp} from './object-graph-bottom-up-iterator';
import type {Access, Group, IdAccess, Person} from './recipes';
import {getOnlyLatestReferencingObjsHashAndId} from './reverse-map-query';
import type {VersionedObjectResult} from './storage-versioned-objects';
import {onVersionedObj} from './storage-versioned-objects';
import {createNeverFailAsyncErrorWrapper} from './util/function';
import type {SHA256IdHash} from './util/type-checks';
const MessageBus = createMessageBus('chum-exporter-storage-monitor');
/**
* This function checks if an Access object grants access to a given Person ID object
* - directly, if the Person ID hash is included (easy)
* - indirectly, if a Person ID hash is included in a group whose ID hash is included in the
* Access object.
* @private
* @async
* @param {SHA256IdHash} personIdHash
* @param {(Access|IdAccess)} accessObj
* @returns {Promise<boolean>} Returns `true` if the given Access object grants access to the
* given Person ID, `false` otherwise
*/
async function isAccessObjectForPerson(
personIdHash: SHA256IdHash<Person>,
accessObj: Readonly<Access | IdAccess>
): Promise<boolean> {
if (accessObj.person.some(idRef => idRef === personIdHash)) {
return true;
}
const groupHashes = await getOnlyLatestReferencingObjsHashAndId(personIdHash, 'Group');
for (const {idHash} of groupHashes) {
if (accessObj.group.some(idRef => idRef === idHash)) {
return true;
}
}
return false;
}
/**
* @private
* @param {SHA256IdHash} remotePersonIdHash
* @param {ProcessAccessObjsDataFn} processAccessObjData
* @param {number} connId
* @returns {ProcessNewGroupObjFn} Returns a function used to subscribe to version map updates
*/
function createOnNewGroupObjectFn(
remotePersonIdHash: SHA256IdHash<Person>,
processAccessObjData: ProcessAccessObjsDataFn,
connId: number
): ProcessNewGroupObjFn {
return async function ProcessNewGroupObj(newGroupObjectData) {
MessageBus.send(
'log',
`[${connId}] onNewGroupObjectFn: ${JSON.stringify(newGroupObjectData)}`
);
if (!newGroupObjectData.obj.person.some(ref => ref === remotePersonIdHash)) {
return;
}
await processAccessObjData(
await getOnlyLatestReferencingObjsHashAndId(newGroupObjectData.idHash, 'Access'),
await getOnlyLatestReferencingObjsHashAndId(newGroupObjectData.idHash, 'IdAccess')
);
};
}
/**
* @private
* @param {SHA256IdHash} remotePersonIdHash
* @param {ProcessAccessObjsDataFn} processAccessObjData
* @param {number} connId
* @returns {ProcessNewAccessObjFn} Returns a function used to subscribe to version map updates
*/
function createOnNewAccessObjectFn(
remotePersonIdHash: SHA256IdHash<Person>,
processAccessObjData: ProcessAccessObjsDataFn,
connId: number
): ProcessNewAccessObjFn {
return async function ProcessNewAccessObj(newAccessObjectData) {
const isForPerson = await isAccessObjectForPerson(
remotePersonIdHash,
newAccessObjectData.obj
);
if (!isForPerson) {
return;
}
MessageBus.send(
'log',
`[${connId}] onNewAccessObjectFn (for logged-in person ID): ${JSON.stringify(
newAccessObjectData
)}`
);
await processAccessObjData([
{
hash: newAccessObjectData.hash,
idHash: newAccessObjectData.idHash,
// Type cast: Never undefined for new *versioned* objects
timestamp: newAccessObjectData.timestamp as number
}
]);
};
}
/**
* This function is similar to `createIterateOverAccessibleGraphFn()` in the parent module of the
* current module `chum.exporter-find-objects()`.
*
* When the remote importer has access to an ID object it has access to all future versions. The
* initial synchronization sends all versions available at the time. This function created here
* is called whenever a new version of the accessible ID object is created while the ONE
* instances remain connected. The new object may have links to other objects that by definition
* also are accessible to the remote instance, so we have to call the object-graph-iterator.
* Unlike the initial synchronization, where the root object always is an Access (or IdAccess)
* object here we may get any kind of object. Another difference is that we always start with a
* concrete object, never with an ID object (IdAccess object).
* @private
* @param {HashesCollectorFn} collector - TODO The type is wrong!! The given type is only for the
* iterator function created in chum-exporter-find-objects
* @returns {AccessCacheIteratorFn} Returns a function that takes a data object and returns
* a promise that resolves when the iteration is over. It resolves with `true` when the
* iteration completed, i.e. all objects in the graph were visited. It resolves with `false` if
* the iterator was stopped by the callback using a `StopIteratorError`.
*/
function createIterateOverNewObjectVersionFn(collector: HashesCollectorFn): AccessCacheIteratorFn {
return async function GraphIteratorFunc({hash}) {
// IMPORTANT - leave nodes first. This means that the remote importer that asks for
// a ONE object can be sure that it already received at least all the hashes of all
// dependencies, because the parent will come _after_all its direct and indirect
// children in the flat array of hashes of accessible objects. This means the remote
// importer can rely on either having the dependency (just retrieved or it already
// had it previously), or that there is an active transfer and an unfulfilled
// promise for the transfer to wait for. This is to ensure that the importer does
// not write an object until all dependencies have been saved: While we create a
// flat list and perform all transfers in parallel the partial sequentialization is
// implemented per received object on the side of the importer.
return await iterateGraphFromObjectBottomUp(hash, collector, {includeFileSize: false});
};
}
/**
* @private
* @param {ProcessNewGroupObjFn} onNewGroupObjFn
* @param {ProcessNewAccessObjFn} onNewAccessObjFn
* @param {AccessCacheIteratorFn} iterateOverAccessibleGraph
* @param {function(SHA256IdHash):boolean} isMonitoredIdHash
* @param {number} connId
* @returns {function(VersionMapUpdateInfo):Promise<boolean>} Returns a function used to
* subscribe to version map updates
*/
function createOnNewVersionedObjectFn(
onNewGroupObjFn: ProcessNewGroupObjFn,
onNewAccessObjFn: ProcessNewAccessObjFn,
iterateOverAccessibleGraph: AccessCacheIteratorFn,
isMonitoredIdHash: (hash: SHA256IdHash) => boolean,
connId: number
): ProcessNewObjFn {
// Only include objects created after this timestamp, which is the time when the storage
// monitor is initialized (subtracting 1 millisecond just to be safe, it does not matter if
// this catches an object that has already been transmitted, the other side will just
// acknowledge without retrieving anything, but missing something would be bad)
const createdAfter = Date.now() - 1;
return async function ProcessNewObj(newObjectData) {
MessageBus.send('log', `[${connId}] onNewObjectFn: ${JSON.stringify(newObjectData)}`);
// Regardless of whether this is a Group, Access or new version of an ID-hash linked
// versioned object of an object graph to which the remote has access, if the object already
// existed before the storage-monitor was started, *and* if the version map did not change
// (e.g. making an existing object the current version again, if it was not), then there is
// no need to do anything.
// NOTE: An undefined timestamp implies an existing object (no version map update
// means there will be no timestamp). The version map will *always* be updated, and a
// timestamp of that action returned, for new objects.
if (newObjectData.timestamp === undefined || newObjectData.timestamp < createdAfter) {
return;
}
switch (newObjectData.obj.$type$) {
case 'Group':
await onNewGroupObjFn(newObjectData as VersionedObjectResult<Group>);
break;
case 'Access':
case 'IdAccess':
await onNewAccessObjFn(newObjectData as VersionedObjectResult<Access | IdAccess>);
break;
default: {
if (isMonitoredIdHash(newObjectData.idHash)) {
await iterateOverAccessibleGraph({
hash: newObjectData.hash,
idHash: newObjectData.idHash,
timestamp: newObjectData.timestamp
});
}
break;
}
}
};
}
/**
* @static
* @function
* @param {StorageMonitorOptions} options
* @param {boolean} options.keepRunning
* @param {function(Error):void} options.onError
* @param {SHA256IdHash} options.remotePersonIdHash
* @param {function(SHA256IdHash):boolean} options.isMonitoredIdHash
* @param {ProcessAccessObjsDataFn} options.processAccessObjData
* @param {HashesCollectorFn} options.onIteratorObject
* @returns {function():void} Returns a function that stops the storage monitoring
*/
export function initStorageMonitor({
keepRunning,
onError,
remotePersonIdHash,
isMonitoredIdHash,
processAccessObjData,
onIteratorObject,
connId
}: StorageMonitorOptions): () => void {
if (!keepRunning) {
// Nothing to do => nothing to undo
return () => undefined;
}
const errorWrapper = createNeverFailAsyncErrorWrapper(onError);
const onNewGroupObject = createOnNewGroupObjectFn(
remotePersonIdHash,
processAccessObjData,
connId
);
const onNewAccessObject = createOnNewAccessObjectFn(
remotePersonIdHash,
processAccessObjData,
connId
);
const iterateOverAccessibleGraph = createIterateOverNewObjectVersionFn(onIteratorObject);
// While we are connected any new objects created by this instance that are accessible to
// the remote instance trigger a notification sent to the other instance, which can then
// decide to download the new object(s).
// NOTE that while the step #2 object graph iterators for the originally found Access
// objects are still running we may already receive updates. They will just be added to the
// hash collection mixed with those coming from the iterator(s).
// These functions must not throw an error or reject their promise since that would end up
// in the wrong context and not in this one right here, hence the wrapper.
const onNewObjectCb = errorWrapper(
createOnNewVersionedObjectFn(
onNewGroupObject,
onNewAccessObject,
iterateOverAccessibleGraph,
isMonitoredIdHash,
connId
)
);
// Version map updates of ID hashes of ID object links in any accessible object, each
// new entry is a new "accessible object" which needs to be sent to the remote instance
return onVersionedObj.addListener(onNewObjectCb);
}