/**
* @author Michael Hasenstein <hasenstein@yahoo.com>
* @copyright REFINIO GmbH 2018
* @license CC-BY-NC-SA-2.5; portions MIT License
* @version 0.0.1
*/
/**
* @private
* @module
*/
/**
* The {@link chum-exporter.module:ts.create|Chum exporter module's create function} accepts a
* single options object parameter of this type
* @private
* @typedef {object} ChumExporterOptions
* @property {WebsocketPromisifierAPI} WebsocketObj - Websocket-promisifier API-object
* @property {function((SHA256Hash)):void} logSentObject - Log ONE object transfers
* @property {function((SHA256IdHash)):void} logSentIdObject - Log ONE ID object transfers
* @property {function(SHA256Hash):void} logSentBlob - Log BLOB transfers
* @property {function(SHA256Hash):void} logSentClob - Log CLOB transfers
* @property {function(Error):void} logError - Log Errors
* @property {boolean} localKeepRunningSetting - Keep updating even after a full sync. is done
* @property {SHA256IdHash} remotePersonId - The ID hash of the owner of the remove instance the
* chum is exchanging data with
*/
export interface ChumExporterOptions {
WebsocketObj: WebsocketPromisifierAPI;
logSentObject: (hash: SHA256Hash) => void;
logSentIdObject: (hash: SHA256IdHash) => void;
logSentBlob: (hash: SHA256Hash<BLOB>) => void;
logSentClob: (hash: SHA256Hash<CLOB>) => void;
logError: (error: Error) => void;
localKeepRunningSetting: boolean;
remotePersonId: SHA256IdHash<Person>;
}
/**
* @private
* @typedef {object} ExporterStats
* @property {Set<SHA256Hash|SHA256IdHash>} sentWaitingForImporterAck - Collect hashes for files we
* sent and when we get the importer's acknowledgement that they were successfully stored on the
* remote instance remove them. This Set only contains hashes of objects actually requested by
* and sent to the remote instance.
* @property {number} nrOfAccessible - The total number of unique and concrete (i.e. non ID)
* object hashes found while gathering the hashes of all objects accessible to the remote instance
* @property {number} nrOfAcknowledged - The total number of hashes acknowledged as "processed" by
* the remote instance. This should be the same as `nrOfAccessible` at the end of the exchange.
*/
export interface ExporterStats {
sentWaitingForImporterAck: Set<SHA256Hash<HashTypes> | SHA256IdHash>;
nrOfAccessible: number;
nrOfAcknowledged: number;
}
import {MESSAGE_TYPES} from './chum-base';
import {createFindAccessibleObjects} from './chum-exporter-find-objects';
import {
createAcknowledgeFn,
createRunFindAccessibleObjectsFn,
createSendBlobFn,
createSendCrdtMetaObjectFn,
createSendIdObjectFn,
createSendObjectFn
} from './chum-exporter-service-functions';
import {createError} from './errors';
import {createMessageBus} from './message-bus';
import type {BLOB, CLOB, HashTypes, Person} from './recipes';
import type {AnyFunction} from './util/function';
import {createNeverFailAsyncErrorWrapper, createRethrowingAsyncErrorWrapper} from './util/function';
import {createTrackingPromise} from './util/promise';
import type {SHA256Hash, SHA256IdHash} from './util/type-checks';
import type {WebsocketPromisifierAPI} from './websocket-promisifier';
const MessageBus = createMessageBus('chum-exporter');
/**
* Function that creates a Chum exporter instance.
*
* ## End conditions
*
* 1. Authentication failure
* 2. Loss of connection (connection error state does not matter)
* 3. A full synchronization means there is nothing left to do - if keepRunning === false
* 4. Any error ends the exporter (incl. errors in detached functions)
*
* The exporter instance also needs to be aware of some decoupled functions:
*
* 1. **Service functions** called by websocket-promisifier. Their caller has only very limited
* error handling: All errors are caught and the remote instance only receives a generic
* error message. However, it is this module (chum-exporter) that ultimately controls the
* services, even if it is not involved in the actual service function invocation. If there
* is an error we probably want to stop the exporter since we are in an undefined state that
* should be fixed.
*
* 2. **Finding accessible hashes** is a service function run in response to a request from the
* remote instance, but it only checks the arguments received over the network and then spawns a
* decoupled function which runs independent of and for much longer than the service request
* function, and therefore has no parent that errors could bubble up to automatically. This
* function consists of two parts:
* - Find the directly accessible root-level objects, i.e. the ones where an
* {@link Access|Access object} exists that points to it granting access for a given
* user and/or group,
* - For each root-level object iterate over the graph created by its hash links.
*
* 3. **Sending accessible hashes events** to the remote instance's importer is another
* decoupled function decoupled from the one described above.
*
* @static
* @async
* @param {ChumExporterOptions} options
* @returns {Promise<Set<SHA256Hash>>} Returns a promise that resolves with a Set object
* containing the hashes of objects we sent but did not receive an acknowledgement for, so we
* don't know if the recipient was able to successfully add them to their storage
*/
export async function createChumExporter(options: ChumExporterOptions): Promise<ExporterStats> {
const {
WebsocketObj,
logSentObject,
logSentIdObject,
logSentBlob,
logSentClob,
logError,
localKeepRunningSetting,
remotePersonId
} = options;
if (remotePersonId === undefined) {
throw createError('CE-CCE1');
}
const exporterStats: ExporterStats = {
sentWaitingForImporterAck: new Set(),
nrOfAccessible: 0,
nrOfAcknowledged: 0
};
// Used to wait for the end condition of the exporter. All stop conditions for the importer
// are realized through this promise.
const exporterTracker = createTrackingPromise<void>();
WebsocketObj.promise
.finally(() => exporterTracker.resolve())
// WebsocketObj promise failure is handled elsewhere, but we need to handle
// rejections of the new Promise created by the finally() method
.catch(ignore => undefined);
// Track the end condition of the decoupled "find accessible objects" object graph iterator
// with a promise. It fits better into our async/await based code than callbacks. The
// function starts another decoupled function for sending "accessible objects" events to the
// remote instance, its errors are reported here too.
const findObjectsTracker = createTrackingPromise<void>();
// End condition #4 - The iterator errors are exporter errors
findObjectsTracker.promise.catch((err: Error): void => {
logError(err);
exporterTracker.reject(err);
});
// TWO IN ONE (multipurpose data structure)
// Declared in this module because it is used by several functions (services). Keys are
// object hashes, values are boolean, i.e. true or false.
// 1. The key alone is for the actual core function, to determine if a given hash is
// accessible (Map's map.has(hash) function is used).
// 2. The boolean value is set to false initially and indicates whether the remote
// instance acknowledged processing this hash.
const accessibleHashes: Map<SHA256Hash<HashTypes> | SHA256IdHash, boolean> = new Map();
// Returns a boolean indicating whether the hash was added
function addAccessible(hash: SHA256Hash<HashTypes> | SHA256IdHash): boolean {
// Duplicates are possible because the object graph iterator only prevents duplicates
// within each graph, but a new iterator is created for each Access object, and the
// object graphs they point to, may overlap
if (accessibleHashes.has(hash)) {
return false; // Hash was not added
}
accessibleHashes.set(hash, false);
exporterStats.nrOfAccessible += 1;
return true; // Hash was added
}
function getAccessibleHashStatus(hash: SHA256Hash<HashTypes> | SHA256IdHash): void | boolean {
return accessibleHashes.get(hash);
}
function addUnacknowledged(hash: SHA256Hash<HashTypes> | SHA256IdHash): void {
exporterStats.sentWaitingForImporterAck.add(hash);
}
let iteratorDone = false;
findObjectsTracker.promise
.finally(() => {
iteratorDone = true;
})
// findObjectsTracker promise failure is handled elsewhere, but we need to handle
// rejections of the new Promise created by the finally() method
.catch(ignore => undefined);
// The chum-exporter-iterator function collecting accessible hashes and sending them to the
// remote importer wants to know about acknowledgements and the resulting sizes of the two
// structures for accessible hashes and for unacknowledged (files were actually sent)
// hashes.
const onAckSubscribers: Set<(a: number, b: number) => void> = new Set();
function subscribeToOnAcknowledgeMsg(fn: (a: number, b: number) => void): void {
onAckSubscribers.add(fn);
}
function onAcknowledgeMsg(hashesAcknowledgedByRemote: Array<SHA256Hash<HashTypes>>): void {
MessageBus.send(
'log',
`[${WebsocketObj.connId}] onAcknowledgeMsg: ACKed ${
hashesAcknowledgedByRemote.length
} hashes - ${hashesAcknowledgedByRemote.join(', ')}`
);
hashesAcknowledgedByRemote.forEach(hash => {
exporterStats.sentWaitingForImporterAck.delete(hash);
accessibleHashes.set(hash, true);
exporterStats.nrOfAcknowledged += 1;
});
// Inform subscribers about how many entries there still are
onAckSubscribers.forEach(fn =>
fn(
exporterStats.nrOfAccessible - exporterStats.nrOfAcknowledged,
exporterStats.sentWaitingForImporterAck.size
)
);
// End condition #3
// 1. We stop when a full synchronization has been achieved (keepRunning is false)
// 2. Storage iteration over all accessible root nodes is finished and all accessible
// hashes have been reported to the remote instance via events (iteratorDone)
// 3. All accessible hashes sent to the remote importer (the hashes, not the files)
// were acknowledged by the remote instance (whether transferred or not needed)
if (
!localKeepRunningSetting &&
iteratorDone &&
exporterStats.nrOfAccessible === exporterStats.nrOfAcknowledged
) {
exporterTracker.resolve();
}
}
// End condition #3
// Callback for all functions not called from here, but which are exporter functions. It is
// like a human organisational structure where you are responsible for employees but the
// work they do is for and controlled by somebody else. We give them this function to also
// report any problems to us. In our context they work for the customer, i.e. the remote
// instance sending requests to us. We want to remain in control of them overall.
function onServiceError(err: Error): void {
// No logError() here because this happens at the end of the create() function which
// "await"s the promise
exporterTracker.reject(err);
}
// Create an exception-intercepting wrapper function for functions we manage but that
// are invoked by the remote instance through the websocket-promisifier connection. The
// actual caller only gets a generic error message, the place where the error must be
// handled is here, however.
const serviceFnErrorWrapper = createRethrowingAsyncErrorWrapper(onServiceError);
function onIteratorError(err: Error): void {
logError(err);
// This also rejects the exporterTracker promise set up in the parent create()
// function. and since the iterator listens on exporterTracker promise it stops itself.
findObjectsTracker.reject(err);
}
// A second error wrapper function that prevents exceptions from being thrown. This is
// for fully detached functions started by service functions, whose exceptions would
// result in "Uncaught error" or an "Unhandled promise rejection" Javascript runtime error.
const iteratorErrorWrapper = createNeverFailAsyncErrorWrapper(onIteratorError);
// The main part of the MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST event handler
// runs decoupled from the actual event handler through setTimeout. Since that means the
// service function loses control
const detachedFindAccessibleObjFn = iteratorErrorWrapper(
createFindAccessibleObjects(
WebsocketObj,
remotePersonId,
addAccessible,
subscribeToOnAcknowledgeMsg,
exporterTracker,
findObjectsTracker,
logError,
WebsocketObj.connId
)
);
const services: Array<[number, AnyFunction]> = [
[
MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST,
createRunFindAccessibleObjectsFn(
detachedFindAccessibleObjFn,
localKeepRunningSetting,
options.WebsocketObj.connId
)
],
[
MESSAGE_TYPES.GET_OBJECT,
createSendObjectFn(
logSentObject,
getAccessibleHashStatus,
addUnacknowledged,
options.WebsocketObj.connId
)
],
[
MESSAGE_TYPES.GET_ID_OBJECT,
createSendIdObjectFn(
logSentIdObject,
getAccessibleHashStatus,
addUnacknowledged,
options.WebsocketObj.connId
)
],
[
MESSAGE_TYPES.GET_BLOB,
createSendBlobFn(
logSentBlob,
logSentClob,
getAccessibleHashStatus,
addUnacknowledged,
options.WebsocketObj.connId
)
],
[
MESSAGE_TYPES.GET_CRDT_META_OBJECT,
createSendCrdtMetaObjectFn(
logSentObject,
getAccessibleHashStatus,
addUnacknowledged,
options.WebsocketObj.connId
)
],
[
MESSAGE_TYPES.ACKNOWLEDGE,
createAcknowledgeFn(onAcknowledgeMsg, options.WebsocketObj.connId)
]
];
services.forEach(([id, fn]) => WebsocketObj.addService(id, serviceFnErrorWrapper(fn)));
// STEP #2
// This tracking promise lets us wait for one of the exporter's end conditions.
// End conditions #3 and #4 - Success: full sync. done, Failure: Error
try {
MessageBus.send('log', `[${WebsocketObj.connId}] AWAIT EXPORTER PROMISE`);
await Promise.race([
// End condition #2 - This also is the main end condition when keepRunning
// Ignore websocket errors - the parent logs them
WebsocketObj.promise.catch(ignore => undefined),
exporterTracker.promise
]);
} finally {
MessageBus.send('log', `[${WebsocketObj.connId}] AWAIT EXPORTER PROMISE - DONE`);
[
MESSAGE_TYPES.ACCESSIBLE_OBJECTS_EVENTS_REQUEST,
MESSAGE_TYPES.GET_OBJECT,
MESSAGE_TYPES.GET_BLOB,
MESSAGE_TYPES.GET_CRDT_META_OBJECT,
MESSAGE_TYPES.ACKNOWLEDGE
].forEach((id: number): void => WebsocketObj.removeService(id));
}
MessageBus.send(
'debug',
`[${WebsocketObj.connId}] ` +
'END OF EXPORTER, statistics:\n' +
' nrOfAccessible: ' +
exporterStats.nrOfAccessible +
'\n' +
' nrOfAcknowledged: ' +
exporterStats.nrOfAcknowledged +
'\n' +
' sentWaitingForImporterAck: ' +
JSON.stringify(Array.from(exporterStats.sentWaitingForImporterAck))
);
return exporterStats;
}