/**
* @author Michael Hasenstein <hasenstein@yahoo.com>
* @copyright REFINIO GmbH & Maximilian Wisgickl 2018
* @license CC-BY-NC-SA-2.5; portions MIT License
* @version 0.0.1
*/
/**
* This module connects two ONE instances and synchronizes (transfers) all objects accessible
* by one instance to the other. The exchange is bidirectional. When the synchronization is
* finished a {@link Chum|Chum object} is written by both sides with a log of the exchange.
*
* ## Limitations
*
* Changes to group memberships performed while a Chum is active are ignored. The Chum uses the
* group memberships it finds when the exchange is initialized.
*
* Writes to storage while a synchronization is running may or may not influence it depending on
* mode and on timing. For example, in mode "keepRunning = false" (see below) the list of all
* {@link Access|Access objects} accessible by the given person (by ID hash) are calculated
* at the start. If an Access object is for an ID hash, i.e. for all versions of a versioned
* object, any new versions written while the synchronization is running *may* be
* transferred (or not) depending on if the object was written before that particular object
* was transferred or after. In mode "keepRunning = true" it will be transferred for certain if
* the write-operation occurs before the synchronization is explicitly ended.
*
* ## keepRunning = false
*
* The Chum synchronizes all currently accessible objects and then exits.
*
* Note that there is no locking, the behavior if the application continues to write to storage
* depends on what it writes and on timing, if it modifies access settings that would be
* relevant to the currently connected remote ONE instance. We do not wish to disable (or queue)
* all write requests while a synchronization is running, since this may take quite a long time.
* We also don't want to incur the cost of checking if any writes are relevant. We think it is
* easy (easier, cheaper) for the application to be programmed in such a way that continued
* writing to ONE storage while an exchange is running has no undesirable consequences.
*
* ## keepRunning = true
*
* The Chum performs a full synchronization and then keeps the connection open and keeps
* synchronizing any objects as they become available _and accessible_. Only when the
* synchronization is explicitly ended are the final Chum log objects written by both sides. It
* will not lose any relevant writes because immediately after the initial calculation of
* accessible Access objects (which point to the accessible objects) the code starts watching
* what ONE objects are written and, if relevant, adds them to the collection of accessible
* hashes unknown with those found through the originally started full synchronization.
*
* While there is no sharp distinction between objects (hashes) found through the request for a
* full synchronization and those found while watching new objects being written there is a "full
* synchronization achieved" status. It is achieved when the objects reachable through all
* {@link Access} objects found initially (not through monitoring ongoing storage writes) have
* all been processed (hashes collected, sent to the remote instance, processed and then
* acknowledged by the remote instance). It is significant because there is a timestamp given to
* the remote instance when it makes its request. "Full synchronization" means that all objects
* accessible at that point have been processed. The timestamp can then be used by the remote
* instance to only ask for objects created after that timestamp.
*
* @module
*/
/**
* @typedef {object} ChumSyncOptions
* @property {SHA256IdHash} localPersonId - The ID hash of the owner of the local instance the
* chum is exchanging data with. This does not have to be the main instance but some anonymous
* ID, so we cannot simply query the instance for the main instance's Person ID
* @property {SHA256IdHash} remotePersonId - The ID hash of the owner of the remove instance the
* chum is exchanging data with
* @property {string} chumName - Name of the Chum
* @property {string} localInstanceName - Name of local instance
* @property {string} remoteInstanceName - Name of remote instance
* @property {boolean} [keepRunning=false] - The connection remains until stopped explicitly
* @property {number} maxNotificationDelay - When `keepRunning` is true and the full
* synchronization is over, what should be the maximum delay between new accessible objects
* being created and notification events sent to us? The remote instance is going to collect all
* new accessible hashes and only sends them after waiting for this many milliseconds. If there
* are a lot of new hashes, exceeding a hard-coded threshold, it sends them right away though.
* If you require fast notifications about rare new-accessible-object events you can set the
* delay low (*1 ms is the minimum*), if it is not urgent you can set the delay higher and let
* the remote exporter accumulate more hashes. The delay does not have to be higher than a few
* seconds, because it will make less and less of a difference very quickly with increasing values.
*/
export interface ChumSyncOptions {
connection: WebsocketPromisifierAPI;
localPersonId: SHA256IdHash<Person>;
remotePersonId: SHA256IdHash<Person>;
chumName: string;
localInstanceName: string;
remoteInstanceName: string;
keepRunning?: boolean;
maxNotificationDelay?: number;
}
export interface ChumApi {
promise: Promise<VersionedObjectResult<Chum>>;
onAccessibleHash: OneEventSource<AccessibleObjectsTuple>;
onBlobSaved: OneEventSource<FileCreation<BLOB>>;
onClobSaved: OneEventSource<FileCreation<CLOB>>;
onFullSync: OneEventSource<void>;
onIdObjectSaved: OneEventSource<IdFileCreation>;
onActivity: OneEventSource<string>;
onObjectSaved: OneEventSource<AnyObjectCreation>;
// close: () => Promise<Chum>;
}
import type {AccessibleObjectsTuple} from './chum-base';
import {MESSAGE_TYPES} from './chum-base';
import {createChumExporter} from './chum-exporter';
import {createChumImporter} from './chum-importer';
import {createError} from './errors';
import {createMessageBus} from './message-bus';
import type {BLOB, Chum, CLOB, HashTypes, Person} from './recipes';
import type {AnyObjectCreation, FileCreation} from './storage-base-common';
import type {IdFileCreation, VersionedObjectResult} from './storage-versioned-objects';
import {getObjectByIdObj, storeVersionedObject} from './storage-versioned-objects';
import type {AnyObject} from './util/object';
import type {OneEventSource} from './util/one-event-source';
import {createEventSource} from './util/one-event-source';
import {retry} from './util/promise';
import type {SHA256Hash, SHA256IdHash} from './util/type-checks';
import type {WebsocketPromisifierAPI} from './websocket-promisifier';
import {UNKNOWN_SERVICE_MSG_START} from './websocket-request-handler';
const MessageBus = createMessageBus('chum-sync');
/**
* Exchanged at the beginning of the Chum protocol. If versions of im- and exporter of both
* nodes donÄt match the chum exchange will be aborted with an error.
* @type {number}
*/
export const PROTOCOL_VERSION = 4;
// FOR TESTS ONLY
// export function setProtocolVersion(v: number): void {
// PROTOCOL_VERSION = v;
// }
/**
* Creates a function that lets importer and exporter and their submodules log received and sent
* ONE objects directly to the prepared Chum object. There only are regular references since ID
* references in objects cause the transfer of the actual object versions. The transfers are
* added as hash links in order to get reverse maps written from the linked objects back to
* the Chum object.
* @private
* @param {(SHA256Hash[]|SHA256IdHash[])} hashes
* @returns {function(SHA256Hash):void}
*/
function createLogSentHashFn<T extends SHA256Hash<HashTypes> | SHA256IdHash>(
hashes: T[]
): (hash: T) => void {
return (hash: T): void => {
hashes.push(hash);
};
}
/**
* @private
* @param {Chum} chumObj
* @returns {number} Returns total number of transfers logged in the Chum object
*/
function nrOfTotalLoggedTransfers(chumObj: Readonly<Chum>): number {
return [
'AtoBObjects',
'AtoBIdObjects',
'AtoBBlob',
'AtoBClob',
'BtoAObjects',
'BtoAIdObjects',
'BtoABlob',
'BtoAClob'
].reduce(
(sum, prop) => sum + (chumObj as AnyObject)[prop].length,
0 // Initial value
);
}
async function checkProtocolVersions(connection: WebsocketPromisifierAPI): Promise<void> {
connection.addService(MESSAGE_TYPES.VERSION_CHECK, (): number => PROTOCOL_VERSION);
// This requires the other side's chum-sync to be started at around the same time, the
// retry() is to work around any differences in timing and is a kind of polling mechanism.
const remoteVersion = await retry(() => connection.send(MESSAGE_TYPES.VERSION_CHECK), {
delay: 300,
retries: 15,
shouldRetry: (err: Error) => err.message.startsWith(UNKNOWN_SERVICE_MSG_START)
});
if (remoteVersion !== PROTOCOL_VERSION) {
MessageBus.send(
'alert',
`[${connection.connId}] CHUM PROTOCOL MISMATCH, local: ${PROTOCOL_VERSION}, remote: ${remoteVersion}`
);
throw createError('CS-MISMATCH', {local: PROTOCOL_VERSION, remote: remoteVersion});
}
}
/**
* @private
* @param {ChumSyncOptions} options - An object with options
* @param {ChumApi} _events - The Chum API object, passed through for the OneEventSource objects, to
* dispatch the events from the proper places in the various Chum modules
* @returns {Promise<VersionedObjectResult<Chum>>}
*/
async function init(
{
connection,
localPersonId,
remotePersonId,
chumName,
localInstanceName,
remoteInstanceName,
keepRunning = false,
maxNotificationDelay
}: ChumSyncOptions,
_events: Omit<ChumApi, 'promise'>
): Promise<VersionedObjectResult<Chum>> {
MessageBus.send(
'log',
`[${connection.connId}] Chum, local: ${localInstanceName}, remote: ${remoteInstanceName}`
);
if (localPersonId === undefined || remotePersonId === undefined) {
throw createError('CS-INIT1', {localPersonId, remotePersonId});
}
connection.promise
.then(() => MessageBus.send('log', `[${connection.connId}] CONECTION CLOSED`))
.catch(err => MessageBus.send('alert', `[${connection.connId}] CONECTION CLOSED ERR`, err));
await checkProtocolVersions(connection);
const chumObj: Chum = {
$type$: 'Chum',
name: chumName,
instance: [localInstanceName, remoteInstanceName],
person: [remotePersonId, localPersonId],
highestRemoteTimestamp: 0,
// The importer and exporter are configured below to add their transfers directly to
// these arrays
AtoBObjects: [],
AtoBIdObjects: [],
AtoBBlob: [],
AtoBClob: [],
BtoAObjects: [],
BtoAIdObjects: [],
BtoABlob: [],
BtoAClob: [],
BtoAExists: 0,
unacknowledged: undefined,
statistics: undefined,
errors: []
};
try {
const {obj: prevChumObj} = await getObjectByIdObj(chumObj);
// In case we don't get a synchronization and therefore no new highest timestamp
chumObj.highestRemoteTimestamp = prevChumObj.highestRemoteTimestamp;
} catch (err) {
if (err.name !== 'FileNotFoundError') {
throw err;
}
}
// FOR FINDING CODE ISSUES: When the logError function is called and the Chum was already
// written we lose the error.
let chumEnded = false;
// If any stored data was already exchanged letting the entire Chum object creation fail is
// counterproductive since we would lose the log of the exchange entirely. That is why
// errors are logged and the Chum object creation is allowed to go ahead in that case.
function logError(err: Error): void {
if (chumEnded) {
// _Somebody_ should notice this asynchronously thrown uncaught error. This can only
// mean there is a code logic error.
return MessageBus.send(
'error',
`[${connection.connId}] Received Error after Chum ended`,
err
);
}
chumObj.errors.push(err);
}
// NO WAITING, just promise creation. This makes the authentication service available.
// THIS NEEDS TO HAPPEN BEFORE WE CONNECT to guarantee that the authentication service is
// available already without even the slightest delay.
const exporterPromise = createChumExporter({
WebsocketObj: connection,
remotePersonId,
logSentObject: createLogSentHashFn(chumObj.AtoBObjects),
logSentIdObject: createLogSentHashFn(chumObj.AtoBIdObjects),
logSentBlob: createLogSentHashFn(chumObj.AtoBBlob),
logSentClob: createLogSentHashFn(chumObj.AtoBClob),
logError,
localKeepRunningSetting: keepRunning
});
const importerPromise = createChumImporter({
WebsocketObj: connection,
logSavedObject: createLogSentHashFn(chumObj.BtoAObjects),
logSavedIdObject: createLogSentHashFn(chumObj.BtoAIdObjects),
logSavedBlob: createLogSentHashFn(chumObj.BtoABlob),
logSavedClob: createLogSentHashFn(chumObj.BtoAClob),
logExists: (count: number) => {
chumObj.BtoAExists += count;
},
logError,
highestRemoteTimestamp: chumObj.highestRemoteTimestamp,
keepRunning,
maxNotificationDelay
});
// LONG WAIT: This is where we wait for the exchange of files to be over; end conditions are:
// 1. Loss of connection ends both importer and exporter (error state does not matter)
// 2. A full synchronization of both importer and exporter means there is nothing left to do
// if keepRunning === false
// 3. Any error independently ends (only) importer or exporter (incl. errors in detached
// functions)
try {
MessageBus.send('log', `[${connection.connId}] AWAIT IMPORTER AND EXPORTER PROMISE`);
const [exporterStats, highestRemoteTimestamp] = await Promise.all([
exporterPromise,
importerPromise
]);
chumObj.highestRemoteTimestamp = highestRemoteTimestamp;
chumObj.unacknowledged = exporterStats.sentWaitingForImporterAck;
} catch (err) {
// POLICY DECISION
// If nothing was transferred, yet we don't bother writing a Chum object. As soon as
// something was transferred we have to log it though, i.e. we write a Chum object and
// log the error inside instead of throwing the error. After all, _some_ exchange took
// place, that is not a total error.
if (nrOfTotalLoggedTransfers(chumObj) === 0) {
throw err;
}
// No logging of errors in the Chum object - importer and exporter already did that.
// Logging here would only log the first error coming from either the importer or the
// exporter but not both, if they both fail.
} finally {
MessageBus.send('log', `[${connection.connId}] AWAIT IMPORTER AND EXPORTER PROMISE - DONE`);
// If it is not closed yet, close it. We expect to need this for the case when
// keepRunning is false, and we complete a full synchronization on both sides, i.e. both
// exporter and importer are done (their promises resolved but the websocket connection
// still is there). This function is synchronous, its possibly asynchronous effect is
// reflected by the websocket connection's tracking promise.
connection.close('Chum ended');
try {
chumObj.statistics = await connection.promise;
} catch (err) {
// Websocket errors are websocket close codes other than CLOSE_CODES.NORMAL and
// CLOSE_CODES.PARTNER_DISCONNECT (defined in websocket-promisifier.js). Both importer
// and exporter ignore websocket error state, they only care about that the connection
// was closed. That means we have to treat errors here (by logging them in the Chum
// object). There is no action, we don't really care much why a connection was closed,
// the synchronization attempts to do what it can and when it's over. We assume the
// connection _always_ is unreliable to begin with.
// Both importer and exporter ignore websocket error state, they only care about
// that the connection was closed. That means we have to treat errors here (by
// logging them in the Chum object). There is no action, we don't really care much
// why a connection was closed, the synchronization attempts to do what it can and
// when it's over, it's over. We assume the connection _always_ is unreliable to
// begin with.
chumObj.errors.push(err);
}
// For callbacks called after this point by still running asynchronous code
chumEnded = true;
}
MessageBus.send(
'debug',
`[${connection.connId}] FINAL new Chum object: ${JSON.stringify(
chumObj,
(key, value) =>
['AtoBObjects', 'BtoAObjects', 'AtoBOther', 'BtoAOther'].includes(key)
? `[${value.length} hashes]`
: value,
4
)}`
);
const storedChumResult = await storeVersionedObject(chumObj);
if (chumObj.errors.length > 0) {
MessageBus.send(
'error',
`[${connection.connId}] END of chum-sync, ERRORS: ` +
`Chum object HASH ${storedChumResult.hash} ID-HASH ${storedChumResult.idHash}`
);
chumObj.errors.forEach(err =>
MessageBus.send('error', `[${connection.connId}] Chum Error:`, err)
);
}
MessageBus.send(
'log',
`[${connection.connId}] END of chum-sync, ` +
`Chum object HASH ${storedChumResult.hash} ID-HASH ${storedChumResult.idHash}`
);
return storedChumResult;
}
/**
* This function executes a Chum synchronization between two ONE instances and when done stores
* a new Chum object with a log of what was exchanged.
* @static
* @async
* @param {ChumSyncOptions} options
* @returns {Promise<ChumApi>} Resolves with the result of storing the new Chum
* object.
*/
export function createChum(options: ChumSyncOptions): ChumApi {
MessageBus.send('log', `CHUM OPTIONS: ${JSON.stringify(options)}`);
const events = {
onAccessibleHash: createEventSource<AccessibleObjectsTuple>(),
onBlobSaved: createEventSource<FileCreation<BLOB>>(),
onClobSaved: createEventSource<FileCreation<CLOB>>(),
onFullSync: createEventSource<void>(),
onIdObjectSaved: createEventSource<IdFileCreation>(),
onActivity: createEventSource<string>(),
onObjectSaved: createEventSource<AnyObjectCreation>()
};
return {
promise: init(options, events),
...events
};
}