From 2187bc40a99187eb9e21a347dc2afae7cbf41ff6 Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Mon, 4 Nov 2019 17:19:35 +1300 Subject: [PATCH 1/9] Add Presence functionality This change adds the ability for clients to broadcast information about "Presence" - the notion of a client's position or state in a particular document. This might be represent a cursor in a text document, or a highlighted field in a more complex JSON document, or any other transient, current information about a client that shouldn't necessarily be stored in the document's chain of ops. The main complication that this feature solves is the issue of keeping presence correctly associated with the version of a `Doc` it was created at. For example, in a "naive" implementation of presence, presence information can arrive ahead of or behind ops, which - in a text-based example - can cause the cursor to "jitter" around the change. Using the ShareDB implementation will ensure that the presence is correctly transformed against any ops, and will ensure that presence information is always consistent with the version of the document. We also locally transform existing presence, which should help to keep (static) remote presence correctly positioned, independent of latency. In order to facilitate this, the feature must be used with an OT type that supports presence. The only requirement for supporting presence is the support of a `transformPresence` method: ```javascript type.transformPresence(presence, op, isOwnOperation): presence; ``` * `presence` _Object_: the presence data being transformed. The type will define this shape to be whatever is appropriate for the type. * `op` _Op_: the operation against which to transform the presence * `isOwnOperation`: _boolean_: whether the presence and the op have the same "owner". This information can be useful for some types to break ties when transforming a presence, for example as used in [`rich-text`][1] This work is based on the [work][2] by @gkubisa and @curran, but with the following aims: - avoid modifying the existing `Doc` class as much as possible, and instead use lifecycle hooks - keep presence separate as its own conceptual entity - break the presence subscriptions apart from `Doc` subscriptions (although in practice, the two are obviously tightly coupled) - allow multiple presences on a single `Doc` on the same `Connection` [1]: https://github.com/quilljs/delta#tranformposition [2]: https://github.com/share/sharedb/pull/288 --- README.md | 118 +++ lib/agent.js | 143 +++- lib/backend.js | 18 + lib/client/connection.js | 76 +- lib/client/doc.js | 11 +- lib/client/presence/doc-presence.js | 26 + lib/client/presence/local-doc-presence.js | 109 +++ lib/client/presence/local-presence.js | 75 ++ lib/client/presence/presence.js | 172 ++++ lib/client/presence/remote-doc-presence.js | 150 ++++ lib/client/presence/remote-presence.js | 22 + lib/error.js | 2 + lib/ot.js | 31 + lib/util.js | 42 + test/client/presence/doc-presence.js | 948 +++++++++++++++++++++ test/client/presence/presence-pauser.js | 43 + test/client/presence/presence-test-type.js | 37 + test/client/presence/presence.js | 445 ++++++++++ test/ot.js | 134 ++- test/util.js | 6 + 20 files changed, 2589 insertions(+), 19 deletions(-) create mode 100644 lib/client/presence/doc-presence.js create mode 100644 lib/client/presence/local-doc-presence.js create mode 100644 lib/client/presence/local-presence.js create mode 100644 lib/client/presence/presence.js create mode 100644 lib/client/presence/remote-doc-presence.js create mode 100644 lib/client/presence/remote-presence.js create mode 100644 test/client/presence/doc-presence.js create mode 100644 test/client/presence/presence-pauser.js create mode 100644 test/client/presence/presence-test-type.js create mode 100644 test/client/presence/presence.js diff --git a/README.md b/README.md index 3b029f039..561b69be2 100644 --- a/README.md +++ b/README.md @@ -158,6 +158,7 @@ Register a new middleware. the database. * `'receive'`: Received a message from a client * `'reply'`: About to send a non-error reply to a client message + * `'sendPresence'`: About to send presence information to a client * `fn` _(Function(context, callback))_ Call this function at the time specified by `action`. * `context` will always have the following properties: @@ -307,6 +308,20 @@ Get a read-only snapshot of a document at the requested version. } ``` +`connection.getPresence(channel): Presence;` +Get a [`Presence`](#class-sharedbpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. + +* `channel` _(String)_ + Presence channel to subscribe to + +`connection.getDocPresence(collection, id): DocPresence;` +Get a special [`DocPresence`](#class-sharedbdocpresence) instance that can be used to subscribe to presence information to other clients, and create instances of local presence. This is tied to a `Doc`, and all presence will be automatically transformed against ops to keep presence current. Note that the `Doc` must be of a type that supports presence. + +* `collection` _(String)_ + Document collection +* `id` _(String)_ + Document ID + ### Class: `ShareDB.Doc` `doc.type` _(String_) @@ -640,6 +655,109 @@ const connectionInfo = getUserPermissions(); const connection = backend.connect(null, connectionInfo); ``` +### Class: `ShareDB.Presence` + +Representation of the presence data associated with a given channel. + +#### `subscribe` + +```javascript +presence.subscribe(callback): void; +``` + +Subscribe to presence updates from other clients. Note that presence can be submitted without subscribing, but remote clients will not be able to re-request presence from you if you are not subscribed. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `unsubscribe` + +```javascript +presence.unsubscribe(callback): void; +``` + +Unsubscribe from presence updates from remote clients. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `on` + +```javascript +presence.on('receive', callback): void; +``` + +An update from a remote presence client has been received. + +* `callback` _Function_: callback for handling the received presence: `function (presenceId, presenceValue): void;` + +```javascript +presence.on('error', callback): void; +``` + +A presence-related error has occurred. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `create` + +```javascript +presence.create(presenceId): LocalPresence; +``` + +Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance. + +* `presenceId` _string_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID. + +#### `destroy` + +```javascript +presence.destroy(callback); +``` + +Updates all remote clients with a `null` presence, and removes it from the `Connection` cache, so that it can be garbage-collected. This should be called when you are done with a presence, and no longer need to use it to fire updates. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +### Class: `ShareDB.DocPresence` + +Specialised case of [`Presence`](#class-sharedbpresence), which is tied to a specific [`Doc`](#class-sharedbdoc). When using presence with an associated `Doc`, any ops applied to the `Doc` will automatically be used to transform associated presence. On destroy, the `DocPresence` will unregister its listeners from the `Doc`. + +See [`Presence`](#class-sharedbpresence) for available methods. + +### Class: `ShareDB.LocalPresence` + +`LocalPresence` represents the presence of the local client in a given `Doc`. For example, this might be the position of a caret in a text document; which field has been highlighted in a complex JSON object; etc. Multiple presences may exist per `Doc` even on the same client. + +#### `submit` + +```javascript +localPresence.submit(presence, callback): void; +``` + +Update the local representation of presence, and broadcast that presence to any other document presence subscribers. + +* `presence` _Object_: the presence object to broadcast. The structure of this will depend on the OT type +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `send` + +```javascript +localPresence.send(callback): void; +``` + +Send presence like `submit`, but without updating the value. Can be useful if local presences expire periodically. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + +#### `destroy` + +```javascript +localPresence.destroy(callback): void; +``` + +Informs all remote clients that this presence is now `null`, and deletes itself for garbage collection. + +* `callback` _Function_: a callback with the signature `function (error: Error): void;` + ### Logging By default, ShareDB logs to `console`. This can be overridden if you wish to silence logs, or to log to your own logging driver or alert service. diff --git a/lib/agent.js b/lib/agent.js index 466d2516e..9dab4280f 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -35,6 +35,19 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + // Track which documents are subscribed to presence by the client. This is a + // map of channel -> stream + this.subscribedPresences = {}; + // Highest seq received for a subscription request. Any seq lower than this + // value is stale, and should be ignored. Used for keeping the subscription + // state in sync with the client's desired state + this.presenceSubscriptionSeq = 0; + // Keep track of the last request that has been sent by each local presence + // belonging to this agent. This is used to generate a new disconnection + // request if the client disconnects ungracefully. This is a + // map of channel -> id -> request + this.presenceRequests = {}; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -78,6 +91,11 @@ Agent.prototype._cleanup = function() { } this.subscribedDocs = {}; + for (var channel in this.subscribedPresences) { + this.subscribedPresences[channel].destroy(); + } + this.subscribedPresences = {}; + // Clean up query subscription streams for (var id in this.subscribedQueries) { var emitter = this.subscribedQueries[id]; @@ -121,6 +139,31 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { }); }; +Agent.prototype._subscribeToPresenceStream = function(channel, stream) { + if (this.closed) return stream.destroy(); + + stream.on('data', function(data) { + if (data.error) { + logger.error('Presence subscription stream error', channel, data.error); + } + this._handlePresenceData(data); + }.bind(this)); + + stream.on('end', function() { + var requests = this.presenceRequests[channel] || {}; + for (var id in requests) { + var request = this.presenceRequests[channel][id]; + request.seq++; + request.p = null; + this._broadcastPresence(request, function(error) { + if (error) logger.error('Error broadcasting disconnect presence', channel, error); + }); + } + delete this.subscribedPresences[channel]; + delete this.presenceRequests[channel]; + }.bind(this)); +}; + Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { var previous = this.subscribedQueries[queryId]; if (previous) previous.destroy(); @@ -311,14 +354,18 @@ Agent.prototype._checkRequest = function(request) { if (request.a === 'qf' || request.a === 'qs' || request.a === 'qu') { // Query messages need an ID property. if (typeof request.id !== 'number') return 'Missing query ID'; - } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u') { + } else if (request.a === 'op' || request.a === 'f' || request.a === 's' || request.a === 'u' || request.a === 'p') { // Doc-based request. if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (request.d != null && typeof request.d !== 'string') return 'Invalid id'; - if (request.a === 'op') { + if (request.a === 'op' || request.a === 'p') { if (request.v != null && (typeof request.v !== 'number' || request.v < 0)) return 'Invalid version'; } + + if (request.a === 'p') { + if (typeof request.id !== 'string') return 'Missing presence ID'; + } } else if (request.a === 'bf' || request.a === 'bs' || request.a === 'bu') { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; @@ -369,6 +416,19 @@ Agent.prototype._handleMessage = function(request, callback) { return this._fetchSnapshot(request.c, request.d, request.v, callback); case 'nt': return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); + case 'p': + var presence = this._createPresence(request); + if (presence.t && !util.supportsPresence(types.map[presence.t])) { + return callback({ + code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE, + message: 'Type does not support presence: ' + presence.t + }); + } + return this._broadcastPresence(presence, callback); + case 'ps': + return this._subscribePresence(request.ch, request.seq, callback); + case 'pu': + return this._unsubscribePresence(request.ch, request.seq, callback); default: callback(new ShareDBError(ERROR_CODE.ERR_MESSAGE_BADLY_FORMED, 'Invalid or unknown message')); } @@ -669,6 +729,85 @@ Agent.prototype._src = function() { return this.src || this.clientId; }; +Agent.prototype._broadcastPresence = function(presence, callback) { + var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {}); + var previousRequest = requests[presence.id]; + if (!previousRequest || previousRequest.seq < presence.seq) { + this.presenceRequests[presence.ch][presence.id] = presence; + } + this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) { + if (error) return callback(error); + var channel = this._getPresenceChannel(presence.ch); + this.backend.pubsub.publish([channel], presence, function(error) { + if (error) return callback(error); + callback(null, presence); + }); + }.bind(this)); +}; + +Agent.prototype._createPresence = function(request) { + return { + a: 'p', + ch: request.ch, + src: this.clientId, + seq: request.seq, + id: request.id, + p: request.p, + c: request.c, + d: request.d, + v: request.v, + t: request.t + }; +}; + +Agent.prototype._subscribePresence = function(channel, seq, callback) { + var presenceChannel = this._getPresenceChannel(channel); + this.backend.pubsub.subscribe(presenceChannel, function(error, stream) { + if (error) return callback(error); + if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq}); + this.presenceSubscriptionSeq = seq; + this.subscribedPresences[channel] = stream; + this._subscribeToPresenceStream(channel, stream); + this._requestPresence(channel, function(error) { + callback(error, {ch: channel, seq: seq}); + }); + }.bind(this)); +}; + +Agent.prototype._unsubscribePresence = function(channel, seq, callback) { + if (seq < this.presenceSubscriptionSeq) return; + this.presenceSubscriptionSeq = seq; + var stream = this.subscribedPresences[channel]; + if (stream) stream.destroy(); + callback(null, {ch: channel, seq: seq}); +}; + +Agent.prototype._getPresenceChannel = function(channel) { + return '$presence.' + channel; +}; + +Agent.prototype._requestPresence = function(channel, callback) { + var presenceChannel = this._getPresenceChannel(channel); + this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback); +}; + +Agent.prototype._handlePresenceData = function(presence) { + if (presence.src === this.clientId) return; + + if (presence.r) return this.send({a: 'pr', ch: presence.ch}); + + var backend = this.backend; + var context = { + collection: presence.c, + presence: presence + }; + backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { + if (error) { + return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + } + this.send(presence); + }.bind(this)); +}; function createClientOp(request, clientId) { // src can be provided if it is not the same as the current agent, diff --git a/lib/backend.js b/lib/backend.js index f1178d730..9c370fbba 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -66,6 +66,8 @@ Backend.prototype.MIDDLEWARE_ACTIONS = { // by design, changing existing reply properties can cause weird bugs, since // the rest of ShareDB would be unaware of those changes. reply: 'reply', + // About to send presence information to a client + sendPresence: 'sendPresence', // An operation is about to be submitted to the database submit: 'submit' }; @@ -822,6 +824,22 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca callback(error, snapshot); }; +Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) { + if (!presence.c || !presence.d) return callback(null, presence); + this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) { + if (error) return callback(error); + for (var i = 0; i < ops.length; i++) { + var op = ops[i]; + var isOwnOp = op.src === presence.src; + var transformError = ot.transformPresence(presence, op, isOwnOp); + if (transformError) { + return callback(transformError); + } + } + callback(null, presence); + }); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index 6fe67b7c9..5099ed92b 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -1,5 +1,7 @@ var Doc = require('./doc'); var Query = require('./query'); +var Presence = require('./presence/presence'); +var DocPresence = require('./presence/doc-presence'); var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request'); var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request'); var emitter = require('../emitter'); @@ -46,6 +48,9 @@ function Connection(socket) { // Map from query ID -> query object. this.queries = {}; + // Maps from channel -> presence objects + this._presences = {}; + // Map from snapshot request ID -> snapshot request this._snapshotRequests = {}; @@ -236,6 +241,14 @@ Connection.prototype.handleMessage = function(message) { var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; + case 'p': + return this._handlePresence(err, message); + case 'ps': + return this._handlePresenceSubscribe(err, message); + case 'pu': + return this._handlePresenceUnsubscribe(err, message); + case 'pr': + return this._handlePresenceRequest(err, message); default: logger.warn('Ignoring unrecognized message', message); @@ -339,6 +352,10 @@ Connection.prototype._setState = function(newState, reason) { docs[id]._onConnectionStateChanged(); } } + // Emit the event to all Presences + for (var channel in this._presences) { + this._presences[channel]._onConnectionStateChanged(); + } // Emit the event to all snapshots for (var id in this._snapshotRequests) { var snapshotRequest = this._snapshotRequests[id]; @@ -493,17 +510,7 @@ Connection.prototype.get = function(collection, id) { * @private */ Connection.prototype._destroyDoc = function(doc) { - var docs = this.collections[doc.collection]; - if (!docs) return; - - delete docs[doc.id]; - - // Delete the collection container if its empty. This could be a source of - // memory leaks if you slowly make a billion collections, which you probably - // won't do anyway, but whatever. - if (!util.hasKeys(docs)) { - delete this.collections[doc.collection]; - } + util.digAndRemove(this.collections, doc.collection, doc.id); }; Connection.prototype._addDoc = function(doc) { @@ -733,3 +740,50 @@ Connection.prototype._initialize = function(message) { this._setState('connected'); }; + +Connection.prototype.getPresence = function(channel) { + return util.digOrCreate(this._presences, channel, function() { + return new Presence(this, channel); + }.bind(this)); +}; + +Connection.prototype.getDocPresence = function(collection, id) { + var channel = DocPresence.channel(collection, id); + return util.digOrCreate(this._presences, channel, function() { + return new DocPresence(this, collection, id); + }.bind(this)); +}; + +Connection.prototype._sendPresenceAction = function(action, seq, presence) { + // Ensure the presence is registered so that it receives the reply message + this._addPresence(presence); + var message = {a: action, ch: presence.channel, seq: seq}; + this.send(message); + return message.seq; +}; + +Connection.prototype._addPresence = function(presence) { + util.digOrCreate(this._presences, presence.channel, function() { + return presence; + }); +}; + +Connection.prototype._handlePresenceSubscribe = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._handleSubscribe(error, message.seq); +}; + +Connection.prototype._handlePresenceUnsubscribe = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._handleUnsubscribe(error, message.seq); +}; + +Connection.prototype._handlePresence = function(error, message) { + var presence = util.dig(this._presences, message.ch); + if (presence) presence._receiveUpdate(error, message); +}; + +Connection.prototype._handlePresenceRequest = function(error, message) { + var presence = util.dig(this._presences, message.ch); + presence._broadcastAllLocalPresence(error, message); +}; diff --git a/lib/client/doc.js b/lib/client/doc.js index da7a7a050..a49b717d0 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -115,10 +115,12 @@ Doc.prototype.destroy = function(callback) { return doc.emit('error', err); } doc.connection._destroyDoc(doc); + doc.emit('destroy'); if (callback) callback(); }); } else { doc.connection._destroyDoc(doc); + doc.emit('destroy'); if (callback) callback(); } }); @@ -569,9 +571,9 @@ Doc.prototype._otApply = function(op, source) { if (transformErr) return this._hardRollback(transformErr); } // Apply the individual op component - this.emit('before op', componentOp.op, source); + this.emit('before op', componentOp.op, source, op.src); this.data = this.type.apply(this.data, componentOp.op); - this.emit('op', componentOp.op, source); + this.emit('op', componentOp.op, source, op.src); } // Pop whatever was submitted since we started applying this op this._popApplyStack(stackLength); @@ -580,7 +582,7 @@ Doc.prototype._otApply = function(op, source) { // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed - this.emit('before op', op.op, source); + this.emit('before op', op.op, source, op.src); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); // Emit an 'op' event once the local data includes the changes from the @@ -588,7 +590,7 @@ Doc.prototype._otApply = function(op, source) { // submission and before the server or other clients have received the op. // For ops from other clients, this will be after the op has been // committed to the database and published - this.emit('op', op.op, source); + this.emit('op', op.op, source, op.src); return; } @@ -861,7 +863,6 @@ Doc.prototype.resume = function() { this.flush(); }; - // *** Receiving operations // This is called when the server acknowledges an operation from the client. diff --git a/lib/client/presence/doc-presence.js b/lib/client/presence/doc-presence.js new file mode 100644 index 000000000..612893c59 --- /dev/null +++ b/lib/client/presence/doc-presence.js @@ -0,0 +1,26 @@ +var Presence = require('./presence'); +var LocalDocPresence = require('./local-doc-presence'); +var RemoteDocPresence = require('./remote-doc-presence'); + +function DocPresence(connection, collection, id) { + var channel = DocPresence.channel(collection, id); + Presence.call(this, connection, channel); + + this.collection = collection; + this.id = id; +} +module.exports = DocPresence; + +DocPresence.prototype = Object.create(Presence.prototype); + +DocPresence.channel = function(collection, id) { + return collection + '.' + id; +}; + +DocPresence.prototype._createLocalPresence = function(id) { + return new LocalDocPresence(this, id); +}; + +DocPresence.prototype._createRemotePresence = function(id) { + return new RemoteDocPresence(this, id); +}; diff --git a/lib/client/presence/local-doc-presence.js b/lib/client/presence/local-doc-presence.js new file mode 100644 index 000000000..0563f016d --- /dev/null +++ b/lib/client/presence/local-doc-presence.js @@ -0,0 +1,109 @@ +var LocalPresence = require('./local-presence'); +var ShareDBError = require('../../error'); +var ERROR_CODE = ShareDBError.CODES; + +module.exports = LocalDocPresence; +function LocalDocPresence(presence, presenceId) { + LocalPresence.call(this, presence, presenceId); + + this.collection = this.presence.collection; + this.id = this.presence.id; + + this._doc = this.connection.get(this.collection, this.id); + this._seq = null; + this._isSending = false; + + this._opHandler = this._transformAgainstOp.bind(this); + this._createOrDelHandler = this._handleCreateOrDel.bind(this); + this._loadHandler = this._handleLoad.bind(this); + this._destroyHandler = this.destroy.bind(this); + this._registerWithDoc(); +} + +LocalDocPresence.prototype = Object.create(LocalPresence.prototype); + +LocalDocPresence.prototype.submit = function(value, callback) { + if (!this._doc.type) { + var error = { + code: ERROR_CODE.ERR_DOC_DOES_NOT_EXIST, + message: 'Cannot submit presence. Document has not been created' + }; + return this._callbackOrEmit(error, callback); + }; + + LocalPresence.prototype.submit.call(this, value, callback); +}; + +LocalDocPresence.prototype.destroy = function(callback) { + this._doc.removeListener('op', this._opHandler); + this._doc.removeListener('create', this._createOrDelHandler); + this._doc.removeListener('del', this._createOrDelHandler); + this._doc.removeListener('load', this._loadHandler); + this._doc.removeListener('destroy', this._destroyHandler); + + LocalPresence.prototype.destroy.call(this, callback); +}; + +LocalDocPresence.prototype._sendPending = function() { + if (this._isSending) return; + this._isSending = true; + this._doc.whenNothingPending(function() { + this._isSending = false; + if (!this.connection.canSend) return; + + this._pendingMessages.forEach(function(message) { + message.t = this._doc.type.uri; + message.v = this._doc.version; + this.connection.send(message); + }.bind(this)); + + this._pendingMessages = []; + }.bind(this)); +}; + +LocalPresence.prototype._registerWithDoc = function() { + this._doc.on('op', this._opHandler); + this._doc.on('create', this._createOrDelHandler); + this._doc.on('del', this._createOrDelHandler); + this._doc.on('load', this._loadHandler); + this._doc.on('destroy', this._destroyHandler); +}; + +LocalDocPresence.prototype._transformAgainstOp = function(op, source) { + this._pendingMessages.forEach(function(message) { + try { + message.p = this._doc.type.transformPresence(message.p, op, source); + } catch (error) { + var callback = this._getCallback(message.seq); + this._callbackOrEmit(error, callback); + } + }.bind(this)); + + try { + this.value = this._doc.type.transformPresence(this.value, op, source); + } catch (error) { + this.emit('error', error); + } +}; + +LocalPresence.prototype._handleCreateOrDel = function() { + this._pendingMessages.forEach(function(message) { + message.p = null; + }); + + this.value = null; +}; + +LocalPresence.prototype._handleLoad = function() { + this.value = null; + this._pendingMessages = []; +}; + +LocalDocPresence.prototype._message = function() { + var message = LocalPresence.prototype._message.call(this); + message.c = this.collection, + message.d = this.id, + message.v = null; + message.t = null; + return message; +}; diff --git a/lib/client/presence/local-presence.js b/lib/client/presence/local-presence.js new file mode 100644 index 000000000..c49ef4a5c --- /dev/null +++ b/lib/client/presence/local-presence.js @@ -0,0 +1,75 @@ +var emitter = require('../../emitter'); + +module.exports = LocalPresence; +function LocalPresence(presence, presenceId) { + emitter.EventEmitter.call(this); + + if (!presenceId || typeof presenceId !== 'string') { + throw new Error('LocalPresence presenceId must be a string'); + } + + this.presence = presence; + this.presenceId = presenceId; + this.connection = presence.connection; + + this.value = null; + + this._pendingMessages = []; + this._callbacksBySeq = {}; +} +emitter.mixin(LocalPresence); + +LocalPresence.prototype.submit = function(value, callback) { + this.value = value; + this.send(callback); +}; + +LocalPresence.prototype.send = function(callback) { + var message = this._message(); + this._pendingMessages.push(message); + this._callbacksBySeq[message.seq] = callback; + this._sendPending(); +}; + +LocalPresence.prototype.destroy = function(callback) { + this.submit(null, function(error) { + if (error) return this._callbackOrEmit(error, callback); + delete this.presence.localPresences[this.presenceId]; + if (callback) callback(); + }.bind(this)); +}; + +LocalPresence.prototype._sendPending = function() { + if (!this.connection.canSend) return; + this._pendingMessages.forEach(function(message) { + this.connection.send(message); + }.bind(this)); + + this._pendingMessages = []; +}; + +LocalPresence.prototype._ack = function(error, seq) { + var callback = this._getCallback(seq); + this._callbackOrEmit(error, callback); +}; + +LocalPresence.prototype._message = function() { + return { + a: 'p', + ch: this.presence.channel, + id: this.presenceId, + p: this.value, + seq: this.connection.seq++ + }; +}; + +LocalPresence.prototype._getCallback = function(seq) { + var callback = this._callbacksBySeq[seq]; + delete this._callbacksBySeq[seq]; + return callback; +}; + +LocalPresence.prototype._callbackOrEmit = function(error, callback) { + if (callback) return process.nextTick(callback, error); + if (error) this.emit('error', error); +}; diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js new file mode 100644 index 000000000..bc0991261 --- /dev/null +++ b/lib/client/presence/presence.js @@ -0,0 +1,172 @@ +var emitter = require('../../emitter'); +var LocalPresence = require('./local-presence'); +var RemotePresence = require('./remote-presence'); +var util = require('../../util'); +var async = require('async'); + +module.exports = Presence; +function Presence(connection, channel) { + emitter.EventEmitter.call(this); + + if (!channel || typeof channel !== 'string') { + throw new Error('Presence channel must be provided'); + } + + this.connection = connection; + this.channel = channel; + + this.wantSubscribe = false; + this.subscribed = false; + this.remotePresences = {}; + this.localPresences = {}; + this.seq = 1; + + this._remotePresenceInstances = {}; + this._subscriptionCallbacksBySeq = {}; +} +emitter.mixin(Presence); + +Presence.prototype.subscribe = function(callback) { + this._sendSubscriptionAction(true, callback); +}; + +Presence.prototype.unsubscribe = function(callback) { + this._sendSubscriptionAction(false, callback); +}; + +Presence.prototype.create = function(id) { + var localPresence = this._createLocalPresence(id); + this.localPresences[id] = localPresence; + return localPresence; +}; + +Presence.prototype.destroy = function(callback) { + this.unsubscribe(function(error) { + if (error) return this._callbackOrEmit(error, callback); + var localIds = Object.keys(this.localPresences); + var remoteIds = Object.keys(this._remotePresenceInstances); + async.parallel( + [ + function(next) { + async.each(localIds, function(presenceId, next) { + this.localPresences[presenceId].destroy(next); + }.bind(this), next); + }.bind(this), + function(next) { + async.each(remoteIds, function(presenceId, next) { + this._remotePresenceInstances[presenceId].destroy(next); + }.bind(this), next); + }.bind(this) + ], + function(error) { + delete this.connection._presences[this.channel]; + this._callbackOrEmit(error, callback); + }.bind(this) + ); + }.bind(this)); +}; + +Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) { + this.wantSubscribe = !!wantSubscribe; + var action = this.wantSubscribe ? 'ps' : 'pu'; + var seq = this.seq++; + this._subscriptionCallbacksBySeq[seq] = callback; + if (this.connection.canSend) { + this.connection._sendPresenceAction(action, seq, this); + } +}; + +Presence.prototype._handleSubscribe = function(error, seq) { + if (this.wantSubscribe) this.subscribed = true; + var callback = this._subscriptionCallback(seq); + this._callbackOrEmit(error, callback); +}; + +Presence.prototype._handleUnsubscribe = function(error, seq) { + this.subscribed = false; + var callback = this._subscriptionCallback(seq); + this._callbackOrEmit(error, callback); +}; + +Presence.prototype._receiveUpdate = function(error, message) { + var localPresence = util.dig(this.localPresences, message.id); + if (localPresence) return localPresence._ack(error, message.seq); + + if (error) return this.emit('error', error); + var remotePresence = util.digOrCreate(this._remotePresenceInstances, message.id, function() { + return this._createRemotePresence(message.id); + }.bind(this)); + + remotePresence.receiveUpdate(message); +}; + +Presence.prototype._updateRemotePresence = function(remotePresence) { + this.remotePresences[remotePresence.presenceId] = remotePresence.value; + if (remotePresence.value === null) this._removeRemotePresence(remotePresence.presenceId); + this.emit('receive', remotePresence.presenceId, remotePresence.value); +}; + +Presence.prototype._broadcastAllLocalPresence = function(error) { + if (error) return this.emit('error', error); + for (var id in this.localPresences) { + var localPresence = this.localPresences[id]; + if (localPresence.value !== null) localPresence.send(); + } +}; + +Presence.prototype._removeRemotePresence = function(id) { + this._remotePresenceInstances[id].destroy(); + delete this._remotePresenceInstances[id]; + delete this.remotePresences[id]; +}; + +Presence.prototype._onConnectionStateChanged = function() { + if (!this.connection.canSend) return; + this._resubscribe(); + for (var id in this.localPresences) { + this.localPresences[id]._sendPending(); + } +}; + +Presence.prototype._resubscribe = function() { + var callbacks = []; + for (var seq in this._subscriptionCallbacksBySeq) { + var callback = this._subscriptionCallback(seq); + callbacks.push(callback); + } + + if (!this.wantSubscribe) return this._callEachOrEmit(null, callbacks); + + this.subscribe(function(error) { + this._callEachOrEmit(error, callbacks); + }.bind(this)); +}; + +Presence.prototype._subscriptionCallback = function(seq) { + var callback = this._subscriptionCallbacksBySeq[seq]; + delete this._subscriptionCallbacksBySeq[seq]; + return callback; +}; + +Presence.prototype._callbackOrEmit = function(error, callback) { + if (callback) return process.nextTick(callback, error); + if (error) this.emit('error', error); +}; + +Presence.prototype._createLocalPresence = function(id) { + return new LocalPresence(this, id); +}; + +Presence.prototype._createRemotePresence = function(id) { + return new RemotePresence(this, id); +}; + +Presence.prototype._callEachOrEmit = function(error, callbacks) { + if (callbacks && callbacks.length) { + return callbacks.forEach(function(callback) { + process.nextTick(callback, error); + }); + } + + if (error) this.emit('error', error); +}; diff --git a/lib/client/presence/remote-doc-presence.js b/lib/client/presence/remote-doc-presence.js new file mode 100644 index 000000000..07b5cc515 --- /dev/null +++ b/lib/client/presence/remote-doc-presence.js @@ -0,0 +1,150 @@ +var RemotePresence = require('./remote-presence'); +var ot = require('../../ot'); + +module.exports = RemoteDocPresence; +function RemoteDocPresence(presence, presenceId) { + RemotePresence.call(this, presence, presenceId); + + this.collection = this.presence.collection; + this.id = this.presence.id; + this.src = null; + + this._doc = this.connection.get(this.collection, this.id); + this._pending = null; + this._opCache = null; + this._pendingSetPending = false; + + this._opHandler = this._handleOp.bind(this); + this._createDelHandler = this._handleCreateDel.bind(this); + this._loadHandler = this._handleLoad.bind(this); + this._registerWithDoc(); +} + +RemoteDocPresence.prototype = Object.create(RemotePresence.prototype); + +RemoteDocPresence.prototype.receiveUpdate = function(message) { + if (this._pending && message.seq < this._pending.seq) return; + this.src = message.src; + this._pending = message; + this._setPendingPresence(); +}; + +RemoteDocPresence.prototype.destroy = function(callback) { + this._doc.removeListener('op', this._opHandler); + this._doc.removeListener('create', this._createDelHandler); + this._doc.removeListener('del', this._createDelHandler); + this._doc.removeListener('load', this._loadHandler); + + RemotePresence.prototype.destroy.call(this, callback); +}; + +RemoteDocPresence.prototype._registerWithDoc = function() { + this._doc.on('op', this._opHandler); + this._doc.on('create', this._createDelHandler); + this._doc.on('del', this._createDelHandler); + this._doc.on('load', this._loadHandler); +}; + +RemoteDocPresence.prototype._setPendingPresence = function() { + if (this._pendingSetPending) return; + this._pendingSetPending = true; + this._doc.whenNothingPending(function() { + this._pendingSetPending = false; + if (!this._pending) return; + if (this._pending.seq < this.seq) return this._pending = null; + + if (this._pending.v > this._doc.version) { + return this._doc.fetch(); + } + + if (!this._catchUpStalePresence()) return; + + this.value = this._pending.p; + this.seq = this._pending.seq; + this._pending = null; + this.presence._updateRemotePresence(this); + }.bind(this)); +}; + +RemoteDocPresence.prototype._handleOp = function(op, source, connectionId) { + var isOwnOp = connectionId === this.src; + this._transformAgainstOp(op, isOwnOp); + this._cacheOp(op, isOwnOp); + this._setPendingPresence(); +}; + +RemotePresence.prototype._handleCreateDel = function() { + this._cacheOp(null); + this._setPendingPresence(); +}; + +RemotePresence.prototype._handleLoad = function() { + this.value = null; + this._callbacksBySeq = {}; + this._pending = null; + this._opCache = null; + this.presence._updateRemotePresence(this); +}; + +RemoteDocPresence.prototype._transformAgainstOp = function(op, isOwnOp) { + if (!this.value) return; + + try { + this.value = this._doc.type.transformPresence(this.value, op, isOwnOp); + } catch (error) { + return this.presence.emit('error', error); + } + this.presence._updateRemotePresence(this); +}; + +RemoteDocPresence.prototype._catchUpStalePresence = function() { + if (this._pending.v >= this._doc.version) return true; + + if (!this._opCache) { + this._startCachingOps(); + this._doc.fetch(); + // We're already subscribed, but we send another subscribe message + // to force presence updates from other clients + this.presence.subscribe(); + return false; + } + + while (this._opCache[this._pending.v]) { + var item = this._opCache[this._pending.v]; + var op = item.op; + var isOwnOp = item.isOwnOp; + // We use a null op to signify a create or a delete operation. In both + // cases we just want to reset the presence (which doesn't make sense + // in a new document), so just set the presence to null. + if (op === null) { + this._pending.p = null; + this._pending.v++; + } else { + ot.transformPresence(this._pending, op, isOwnOp); + } + } + + var hasCaughtUp = this._pending.v >= this._doc.version; + if (hasCaughtUp) { + this._stopCachingOps(); + } + + return hasCaughtUp; +}; + +RemoteDocPresence.prototype._startCachingOps = function() { + this._opCache = []; +}; + +RemoteDocPresence.prototype._stopCachingOps = function() { + this._opCache = null; +}; + +RemoteDocPresence.prototype._cacheOp = function(op, isOwnOp) { + if (this._opCache) { + op = op ? {op: op} : null; + // Subtract 1 from the current doc version, because an op with v3 + // should be read as the op that takes a doc from v3 -> v4 + this._opCache[this._doc.version - 1] = {op: op, isOwnOp: isOwnOp}; + } +}; diff --git a/lib/client/presence/remote-presence.js b/lib/client/presence/remote-presence.js new file mode 100644 index 000000000..280392974 --- /dev/null +++ b/lib/client/presence/remote-presence.js @@ -0,0 +1,22 @@ +module.exports = RemotePresence; +function RemotePresence(presence, presenceId) { + this.presence = presence; + this.presenceId = presenceId; + this.connection = this.presence.connection; + + this.value = null; + this.seq = 0; +} + +RemotePresence.prototype.receiveUpdate = function(message) { + if (message.seq < this.seq) return; + this.value = message.p; + this.seq = message.seq; + this.presence._updateRemotePresence(this); +}; + +RemotePresence.prototype.destroy = function(callback) { + delete this.presence._remotePresenceInstances[this.presenceId]; + delete this.presence.remotePresences[this.presenceId]; + if (callback) process.nextTick(callback); +}; diff --git a/lib/error.js b/lib/error.js index 4cd4f770f..437a1d446 100644 --- a/lib/error.js +++ b/lib/error.js @@ -40,6 +40,7 @@ ShareDBError.CODES = { ERR_OP_VERSION_NEWER_THAN_CURRENT_SNAPSHOT: 'ERR_OP_VERSION_NEWER_THAN_CURRENT_SNAPSHOT', ERR_OT_OP_BADLY_FORMED: 'ERR_OT_OP_BADLY_FORMED', ERR_OT_OP_NOT_PROVIDED: 'ERR_OT_OP_NOT_PROVIDED', + ERR_PRESENCE_TRANSFORM_FAILED: 'ERR_PRESENCE_TRANSFORM_FAILED', ERR_PROTOCOL_VERSION_NOT_SUPPORTED: 'ERR_PROTOCOL_VERSION_NOT_SUPPORTED', ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED: 'ERR_QUERY_EMITTER_LISTENER_NOT_ASSIGNED', /** @@ -63,6 +64,7 @@ ShareDBError.CODES = { ERR_SNAPSHOT_READS_REJECTED: 'ERR_SNAPSHOT_READS_REJECTED', ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND: 'ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND', ERR_TYPE_CANNOT_BE_PROJECTED: 'ERR_TYPE_CANNOT_BE_PROJECTED', + ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE: 'ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE', ERR_UNKNOWN_ERROR: 'ERR_UNKNOWN_ERROR' }; diff --git a/lib/ot.js b/lib/ot.js index 1de04ef64..e6e96f755 100644 --- a/lib/ot.js +++ b/lib/ot.js @@ -5,6 +5,7 @@ var types = require('./types').map; var ShareDBError = require('./error'); +var util = require('./util'); var ERROR_CODE = ShareDBError.CODES; @@ -185,3 +186,33 @@ exports.applyOps = function(snapshot, ops) { } } }; + +exports.transformPresence = function(presence, op, isOwnOp) { + var opError = this.checkOp(op); + if (opError) return opError; + + var type = presence.t; + if (typeof type === 'string') { + type = types[type]; + } + if (!type) return {code: ERROR_CODE.ERR_DOC_TYPE_NOT_RECOGNIZED, message: 'Unknown type'}; + if (!util.supportsPresence(type)) { + return {code: ERROR_CODE.ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE, message: 'Type does not support presence'}; + } + + if (op.create || op.del) { + presence.p = null; + presence.v++; + return; + } + + try { + presence.p = presence.p === null ? + null : + type.transformPresence(presence.p, op.op, isOwnOp); + } catch (error) { + return {code: ERROR_CODE.ERR_PRESENCE_TRANSFORM_FAILED, message: error.message || error}; + } + + presence.v++; +}; diff --git a/lib/util.js b/lib/util.js index 601e98cb8..a036aab02 100644 --- a/lib/util.js +++ b/lib/util.js @@ -24,3 +24,45 @@ exports.isValidTimestamp = function(timestamp) { }; exports.MAX_SAFE_INTEGER = 9007199254740991; + +exports.dig = function() { + var obj = arguments[0]; + for (var i = 1; i < arguments.length; i++) { + var key = arguments[i]; + obj = obj[key] || (i === arguments.length - 1 ? undefined : {}); + } + return obj; +}; + +exports.digOrCreate = function() { + var obj = arguments[0]; + var createCallback = arguments[arguments.length - 1]; + for (var i = 1; i < arguments.length - 1; i++) { + var key = arguments[i]; + obj = obj[key] || + (obj[key] = i === arguments.length - 2 ? createCallback() : {}); + } + return obj; +}; + +exports.digAndRemove = function() { + var obj = arguments[0]; + var objects = [obj]; + for (var i = 1; i < arguments.length - 1; i++) { + var key = arguments[i]; + if (!obj.hasOwnProperty(key)) break; + obj = obj[key]; + objects.push(obj); + }; + + for (var i = objects.length - 1; i >= 0; i--) { + var parent = objects[i]; + var key = arguments[i + 1]; + var child = parent[key]; + if (i === objects.length - 1 || !exports.hasKeys(child)) delete parent[key]; + } +}; + +exports.supportsPresence = function(type) { + return type && typeof type.transformPresence === 'function'; +}; diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js new file mode 100644 index 000000000..4afc9a8e5 --- /dev/null +++ b/test/client/presence/doc-presence.js @@ -0,0 +1,948 @@ +var Backend = require('../../../lib/backend'); +var expect = require('chai').expect; +var async = require('async'); +var types = require('../../../lib/types'); +var presenceTestType = require('./presence-test-type'); +var errorHandler = require('../../util').errorHandler; +var PresencePauser = require('./presence-pauser'); +types.register(presenceTestType.type); + +describe('DocPresence', function() { + var backend; + var connection1; + var connection2; + var doc1; + var doc2; + var presence1; + var presence2; + var presencePauser; + + beforeEach(function(done) { + backend = new Backend(); + connection1 = backend.connect(); + connection2 = backend.connect(); + + presencePauser = new PresencePauser(); + + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + presencePauser.sendPresence(request, callback); + }); + + doc1 = connection1.get('books', 'northern-lights'); + doc2 = connection2.get('books', 'northern-lights'); + + async.series([ + doc1.create.bind(doc1, 'North Lights', presenceTestType.type.name), + doc1.subscribe.bind(doc1), + doc2.subscribe.bind(doc2), + function(next) { + presence1 = connection1.getDocPresence('books', 'northern-lights'); + presence2 = connection2.getDocPresence('books', 'northern-lights'); + next(); + } + ], done); + }); + + afterEach(function(done) { + delete presenceTestType.type.invert; + connection1.close(); + connection2.close(); + backend.close(done); + }); + + it('subscribes to presence from another client', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('transforms existing remote presence when a new local op is applied', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 7}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 7}); + next(); + }); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(doc2.data).to.eql('Northern Lights'); + expect(presence).to.eql({index: 10}); + expect(presence2.remotePresences).to.eql({ + 'presence-1': {index: 10} + }); + next(); + }); + + doc2.submitOp({index: 5, value: 'ern'}); + } + ], done); + }); + + it('transforms existing local presence when a new local op is applied', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + localPresence1.submit.bind(localPresence1, {index: 7}), + doc1.submitOp.bind(doc1, {index: 5, value: 'ern'}), + function(next) { + expect(localPresence1.value).to.eql({index: 10}); + next(); + } + ], done); + }); + + it('progresses another client\'s presence when they send an op at their index', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 5}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + doc2.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 8}); + next(); + }); + } + ], done); + }); + + it('does not progress another client\'s index when inserting a local op at their index', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 5}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 5}); + next(); + }); + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('waits for pending ops before submitting presence', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 20}); + expect(doc2.version).to.eql(2); + next(); + }); + } + ], done); + }); + + it('queues two updates immediately after one another', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 4}, errorHandler(done)); + localPresence1.submit({index: 5}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 4}); + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 5}); + next(); + }); + }); + } + ], done); + }); + + it('transforms pending presence by another op submitted before a flush', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(3); + expect(doc2.data).to.eql('Northern Lights: His Dark Materials'); + expect(presence).to.eql({index: 23}); + next(); + }); + } + ], done); + }); + + it('updates the document when the presence version is ahead', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence2.submit({index: 12}, errorHandler(done)); + + presence1.once('receive', function(id, presence) { + expect(doc1.version).to.eql(2); + expect(presence).to.eql({index: 12}); + next(); + }); + } + ], done); + }); + + it('transforms old presence when its version is behind the latest doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence1.submit({index: 12}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(2); + expect(presence).to.eql({index: 15}); + next(); + }); + } + ], done); + }); + + it('returns errors when failing to transform old presence to the latest doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + expect(doc1.version).to.eql(1); + expect(doc2.version).to.eql(2); + + localPresence1.submit({badProp: 'foo'}, function(error) { + expect(error.code).to.equal('ERR_PRESENCE_TRANSFORM_FAILED'); + next(); + }); + } + ], done); + }); + + it('transforms old presence when it arrives later than a new op', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + presencePauser.pause(); + presencePauser.onPause = function() { + next(); + }; + localPresence1.submit({index: 12}, errorHandler(done)); + }, + function(next) { + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + + doc2.once('op', function() { + presencePauser.resume(); + }); + + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(2); + expect(presence).to.eql({index: 15}); + next(); + }); + } + ], done); + }); + + // This test case attempts to force us into a tight race condition corner case: + // 1. doc1 sends presence, as well as submits an op + // 2. doc2 receives the op first, followed by the presence, which is now out-of-date + // 3. doc2 re-requests doc1's presence again + // 4. doc1 sends *another* op, which *again* beats the presence update (this could + // in theory happen many times in a row) + it('transforms old presence when new ops keep beating the presence responses', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + // Pause presence just before sending it back to the clients. It's already been + // transformed by the server to what the server knows as the latest version + presencePauser.pause(); + presencePauser.onPause = function() { + next(); + }; + + localPresence1.submit({index: 12}, errorHandler(done)); + }, + function(next) { + // Now we submit another op, while the presence is still paused. We wait until + // doc2 has received this op, so we know that when we finally receive our + // presence, it will be stale + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + doc2.once('op', function() { + next(); + }); + }, + function(next) { + // At this point in the test, both docs are up-to-date on v2, but doc2 still + // hasn't received doc1's v1 presence + expect(doc1.version).to.eql(2); + expect(doc2.version).to.eql(2); + + // Resume presence broadcasts so that doc2 receives v1's stale presence + presencePauser.resume(); + // However, now immediately pause again. Set a conditional pause, which + // will allow doc2 to request presence from doc1, but will pause doc1's + // presence response, making it stale again + presencePauser.pause(function(request) { + return request.presence.id === 'presence-1'; + }); + presencePauser.onPause = function() { + presencePauser.onPause = null; + + // When we capture doc1's response, doc1 also submits some ops, which + // will make its response stale again. + doc1.submitOp({index: 0, value: 'The'}, function(error) { + if (error) return done(error); + doc1.submitOp({index: 3, value: ' '}, errorHandler(done)); + doc2.on('op', function() { + // This will get fired for v3 and then v4, so check for the later one + if (doc1.version === 4 && doc2.version === 4) { + // Only once doc2 has received the ops, should we resume our + // broadcasts, ensuring that the update is stale again. + presencePauser.resume(); + // Despite the second reply being stale, we expect to have transformed it + // up to the current version. + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(4); + expect(presence).to.eql({index: 19}); + next(); + }); + } + }); + }); + }; + } + ], done); + }); + + // This test is for a similar case to the above test case, but ensures that our + // op cache correctly handles deletion and creation ops + it('transforms old presence when a doc is deleted and then created', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence1.submit({index: 12}, errorHandler(done)); + presencePauser.pause(); + presencePauser.onPause = function() { + presencePauser.onPause = null; + next(); + }; + }, + function(next) { + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + doc2.once('op', function() { + next(); + }); + }, + function(next) { + expect(doc1.version).to.eql(2); + expect(doc2.version).to.eql(2); + + presencePauser.resume(); + presencePauser.pause(function(request) { + return request.presence.id === 'presence-1'; + }); + presencePauser.onPause = function() { + presencePauser.onPause = null; + + async.series([ + doc1.del.bind(doc1), + doc1.create.bind(doc1, 'Subtle Knife', presenceTestType.type.name), + doc1.submitOp.bind(doc1, {index: 0, value: 'The '}) + ], errorHandler(done)); + }; + + doc2.on('op', function() { + if (doc2.version !== 5) return; + presencePauser.resume(); + presence2.once('receive', function(id, presence) { + expect(doc2.version).to.eql(5); + expect(presence).to.be.null; + next(); + }); + }); + } + ], done); + }); + + it('transforms local presence when a doc is deleted and created locally', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + localPresence1.submit.bind(localPresence1, {index: 3}), + doc1.del.bind(doc1), + doc1.create.bind(doc1, 'Subtle Knife', presenceTestType.type.uri), + function(next) { + expect(localPresence1.value).to.be.null; + next(); + } + ], done); + }); + + it('transforms pending presence by a re-creation submitted before a flush', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + doc1.submitOp({index: 12, value: ': His Dark Materials'}, errorHandler(done)); + localPresence1.submit({index: 20}, errorHandler(done)); + doc1.del(errorHandler(done)); + doc1.create('Subtle Knife', presenceTestType.type.uri, errorHandler(done)); + + presence2.on('receive', function(id, presence) { + if (doc2.version !== 4) return; + expect(doc2.data).to.eql('Subtle Knife'); + expect(presence).to.be.null; + next(); + }); + } + ], done); + }); + + it('ignores presence that arrives out of order', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + var hasPaused = false; + // Catch the first presence update, but then allow later ones + presencePauser.pause(function() { + if (hasPaused) return false; + hasPaused = true; + return true; + }); + + localPresence1.submit({index: 2}, next); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 3}); + + presence2.once('receive', function() { + done(new Error('should not get another presence event')); + }); + + presencePauser.resume(); + next(); + }); + + localPresence1.submit({index: 3}, errorHandler(done)); + } + ], done); + }); + + it('ignores pending presence that arrives out of order', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + doc1.unsubscribe.bind(doc1), + doc2.submitOp.bind(doc2, {index: 5, value: 'ern'}), + function(next) { + var pauseCount = 0; + presencePauser.pause(); + presencePauser.onPause = function() { + pauseCount++; + if (pauseCount === 2) { + expect(this._pendingBroadcasts[0][0].presence.p).to.eql({index: 2}); + expect(this._pendingBroadcasts[1][0].presence.p).to.eql({index: 4}); + expect(this._pendingBroadcasts[0][0].presence.seq) + .to.be.lessThan(this._pendingBroadcasts[1][0].presence.seq); + + // Fire the broadcasts in the reverse order + this._pendingBroadcasts[1][1](); + this._pendingBroadcasts[0][1](); + } + }; + + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 4}); + next(); + }); + + localPresence2.submit({index: 2}, errorHandler(done)); + localPresence2.submit({index: 4}, errorHandler(done)); + } + ], done); + }); + + it('rejects a presence message with a numeric collection', function(done) { + var localPresence1 = presence1.create('presence-1'); + localPresence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.c = 1; + message.v = 1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('rejects a presence message with an invalid version', function(done) { + var localPresence1 = presence1.create('presence-1'); + localPresence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.v = -1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('rejects a presence message without an ID', function(done) { + var localPresence1 = presence1.create('presence-1'); + // Have to catch the error on the Presence instance, because obviously + // we won't find the LocalPresence without the ID + presence1.on('error', function(error) { + expect(error.code).to.eql('ERR_MESSAGE_BADLY_FORMED'); + done(); + }); + + var message = localPresence1._message(); + message.id = null; + message.v = 1; + message.t = presenceTestType.type.uri; + connection1.send(message); + }); + + it('only sends presence responses for the associated doc', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + var otherDoc1 = connection1.get('books', 'subtle-knife'); + var otherDoc2 = connection2.get('books', 'subtle-knife'); + var otherPresence1 = connection1.getDocPresence('books', 'subtle-knife'); + var otherPresence2 = connection2.getDocPresence('books', 'subtle-knife'); + var localOtherPresence1 = otherPresence1.create('other-presence-1'); + + async.series([ + otherDoc1.create.bind(otherDoc1, 'Subtle Knife', presenceTestType.type.uri), + otherDoc2.subscribe.bind(otherDoc2), + otherPresence2.subscribe.bind(otherPresence2), + function(next) { + localOtherPresence1.submit({index: 0}, errorHandler(done)); + otherPresence2.once('receive', function() { + next(); + }); + }, + localPresence1.submit.bind(localPresence1, {index: 3}), + function(next) { + localPresence2.submit({index: 5}, next); + otherPresence1.on('receive', function() { + done(new Error('Other document should not have had presence sent')); + }); + otherPresence2.on('receive', function() { + done(new Error('Other document should not have had presence sent')); + }); + } + ], done); + }); + + it('sends the presence data once the connection can send', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + connection2._setState('disconnected'); + localPresence2.submit({index: 1}, errorHandler(done)); + + doc2.whenNothingPending(function() { + // The connection tests whether we can send just before sending on + // nothing pending, so let's also wait to reset the connection. + connection2._setState('connecting'); + connection2._setState('connected'); + }); + + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('re-requests presence when reconnecting', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + connection1._setState('disconnected'); + next(); + }, + function(next) { + localPresence2.submit({index: 0}, errorHandler(done)); + // We've not _actually_ disconnected the connection, so this + // event will still fire. + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + connection1._setState('connecting'); + connection1._setState('connected'); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 0}); + next(); + }); + } + ], done); + }); + + it('un-transforms presence after a soft rollback', function(done) { + // Mock invert so that we can trigger a soft rollback instead of a hard rollback + presenceTestType.type.invert = function() { + return {index: 5, del: 3}; + }; + + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 7}), + function(next) { + localPresence2.submit({index: 8}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + callback({code: 'ERR_OP_SUBMIT_REJECTED'}); + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 10}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 11} + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 7}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 8} + }); + next(); + }); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('performs a hard reset on presence when the doc is hard rolled back', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 7}), + function(next) { + localPresence2.submit({index: 8}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + callback({code: 'ERR_OP_SUBMIT_REJECTED'}); + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.eql({index: 10}); + expect(presence1.remotePresences).to.eql({ + 'presence-2': {index: 11} + }); + + presence1.once('receive', function() { + expect(localPresence1.value).to.be.null; + expect(presence1.remotePresences).to.eql({}); + next(); + }); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + } + ], done); + }); + + it('can receive presence before performing the first fetch on a document', function(done) { + var connection3 = backend.connect(); + var doc3 = connection3.get('books', 'northern-lights'); + var presence3 = connection3.getDocPresence('books', 'northern-lights'); + var localPresence3 = presence3.create('presence-3'); + + async.series([ + presence1.subscribe.bind(presence1), + doc3.fetch.bind(doc3), + function(next) { + localPresence3.submit({index: 1}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(id).to.eql('presence-3'); + expect(presence).to.eql({index: 1}); + next(); + }); + } + ], done); + }); + + it('errors when submitting presence on a document that has not been created', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + doc1.del.bind(doc1), + function(next) { + localPresence1.submit({index: 2}, function(error) { + expect(error.code).to.eql('ERR_DOC_DOES_NOT_EXIST'); + next(); + }); + } + ], done); + }); + + it('errors when trying to submit presence on a type that does not support it', function(done) { + var jsonDoc = connection1.get('books', 'snuff'); + var jsonPresence = connection1.getDocPresence('books', 'snuff'); + var localJsonPresence = jsonPresence.create('json-presence'); + + async.series([ + jsonDoc.create.bind(jsonDoc, {title: 'Snuff'}, 'json0'), + function(next) { + localJsonPresence.submit({index: 1}, function(error) { + expect(error.code).to.eql('ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE'); + next(); + }); + } + ], done); + }); + + it('returns errors sent from the middleware', function(done) { + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + callback('some error'); + }); + + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({index: 0}, errorHandler(done)); + presence1.once('error', function(error) { + expect(error.message).to.equal('some error'); + next(); + }); + } + ], done); + }); + + it('removes doc event listeners when destroying presence', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence2.subscribe.bind(presence2), + localPresence2.submit.bind(localPresence2, {index: 2}), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + presence2.destroy.bind(presence2), + function(next) { + expect(doc2._eventsCount).to.equal(0); + next(); + } + ], done); + }); + + it('destroys remote presence when it is updated with null', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)), + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence1.submit(null, errorHandler(done)), + presence2.once('receive', function(id, presence) { + expect(presence).to.be.null; + expect(doc2._eventsCount).to.equal(0); + next(); + }); + } + ], done); + }); + + it('waits for local pending ops before accepting remote presence', function(done) { + var localPresence2 = presence2.create('presence-2'); + + var triggerApply; + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + backend.use(backend.MIDDLEWARE_ACTIONS.apply, function(request, callback) { + triggerApply = callback; + expect(doc1.inflightOp).to.be.ok; + expect(doc1.pendingOps).to.be.empty; + next(); + }); + + doc1.submitOp({index: 5, value: 'ern'}, errorHandler(done)); + }, + localPresence2.submit.bind(localPresence2, {index: 10}), + function(next) { + triggerApply(); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({index: 13}); + next(); + }); + } + ], done); + }); + + it('emits an error when trying to transform bad local presence against an op', function(done) { + var localPresence1 = presence1.create('presence-1'); + + localPresence1.submit({badProp: 'foo'}, function(error) { + expect(error).to.be.ok; + }); + + localPresence1.once('error', function() { + done(); + }); + + doc1.submitOp({index: 5, value: 'ern'}); + }); + + it('emits an error when trying to transform bad remote presence against an op', function(done) { + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + localPresence2.submit({badProp: 'foo'}, errorHandler(done)); + presence1.once('receive', function(id, presence) { + expect(presence).to.eql({badProp: 'foo'}); + next(); + }); + }, + function(next) { + localPresence2.once('error', function() { + // Ignore the local error + }); + presence1.once('error', function() { + next(); + }); + doc1.submitOp({index: 5, value: 'ern'}); + } + ], done); + }); + + it('sends null presence when the doc is destroyed', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + doc1.destroy(errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.be.null; + next(); + }); + } + ], done); + }); +}); diff --git a/test/client/presence/presence-pauser.js b/test/client/presence/presence-pauser.js new file mode 100644 index 000000000..09d8f5a52 --- /dev/null +++ b/test/client/presence/presence-pauser.js @@ -0,0 +1,43 @@ +// Helper middleware for precise control over when clients receive +// presence updates +module.exports = PresencePauser; +function PresencePauser() { + // Handler that can be set to be called when a message + // is paused + this.onPause = null; + this._shouldPause = false; + this._pendingBroadcasts = []; + + // Main middleware method + this.sendPresence = function(request, callback) { + if (!this._isPaused(request)) return callback(); + this._pendingBroadcasts.push([request, callback]); + if (typeof this.onPause === 'function') { + this.onPause(request); + } + }; + + // If called without an argument, will pause all broadcasts. + // If called with a function, the returned result will determine + // whether the request is paused + this.pause = function(predicate) { + this._shouldPause = typeof predicate === 'function' ? predicate : true; + }; + + // Send all paused broadcasts, and unpause. Also unsets the onPause + // handler + this.resume = function() { + this._shouldPause = false; + this._pendingBroadcasts.forEach(function(broadcast) { + var callback = broadcast[1]; + callback(); + }); + this._pendingBroadcasts = []; + this.onPause = null; + }; + + this._isPaused = function(request) { + return this._shouldPause === true || + typeof this._shouldPause === 'function' && this._shouldPause(request); + }; +} diff --git a/test/client/presence/presence-test-type.js b/test/client/presence/presence-test-type.js new file mode 100644 index 000000000..d1c1d42e3 --- /dev/null +++ b/test/client/presence/presence-test-type.js @@ -0,0 +1,37 @@ +exports.type = { + name: 'presence-test-type', + uri: 'http://sharejs.org/types/presence-test-type', + create: create, + apply: apply, + transformPresence: transformPresence +}; + +function create(data) { + return typeof data === 'string' ? data : ''; +} + +function apply(snapshot, op) { + if (op.value) { + return snapshot.substring(0, op.index) + op.value + snapshot.substring(op.index); + } else if (op.del) { + return snapshot.substring(0, op.index) + snapshot.substring(op.index + op.del); + } + + throw new Error('Invalid op'); +} + +function transformPresence(presence, op, isOwnOperation) { + if (!presence || presence.index < op.index || (presence.index === op.index && !isOwnOperation)) { + return presence; + } + + if (typeof presence.index !== 'number') throw new Error('Presence index is not a number'); + + if (op.value) { + return {index: presence.index + op.value.length}; + } else if (op.del) { + return {index: presence.index - op.del}; + } + + throw new Error('Invalid op'); +} diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js new file mode 100644 index 000000000..5d21be5a0 --- /dev/null +++ b/test/client/presence/presence.js @@ -0,0 +1,445 @@ +var Backend = require('../../../lib/backend'); +var PresencePauser = require('./presence-pauser'); +var expect = require('chai').expect; +var async = require('async'); +var errorHandler = require('../../util').errorHandler; +var sinon = require('sinon'); + +describe('Presence', function() { + var backend; + var connection1; + var connection2; + var presence1; + var presence2; + var presencePauser; + + beforeEach(function(done) { + backend = new Backend(); + var connectedCount = 0; + connection1 = backend.connect(); + connection2 = backend.connect(); + + var checkConnections = function() { + connectedCount++; + if (connectedCount === 2) done(); + }; + + connection1.on('connected', checkConnections); + connection2.on('connected', checkConnections); + + presencePauser = new PresencePauser(); + + backend.use(backend.MIDDLEWARE_ACTIONS.sendPresence, function(request, callback) { + presencePauser.sendPresence(request, callback); + }); + + presence1 = connection1.getPresence('test-channel'); + presence2 = connection2.getPresence('test-channel'); + }); + + afterEach(function(done) { + sinon.restore(); + connection1.close(); + connection2.close(); + backend.close(done); + }); + + it('can subscribe to updates from other clients', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 5}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 5}); + next(); + }); + } + ], done); + }); + + it('can unsubscribe from updates to other clients', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + presence2.unsubscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 5}, errorHandler(done)); + presence2.once('receive', function() { + done(new Error('Should not have received presence update')); + }); + next(); + } + ], done); + }); + + it('requests existing presence from other subscribed clients when subscribing', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, {index: 2}), + function(next) { + presence2.subscribe(errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 2}); + next(); + }); + } + ], done); + }); + + it('removes remote presence when it is set to null', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + expect(presence2.remotePresences).to.eql({ + 'presence-1': {index: 3} + }); + next(); + }); + }, + function(next) { + localPresence1.submit(null, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(presence).to.be.null; + expect(presence2.remotePresences).to.eql({}); + next(); + }); + } + ], done); + }); + + it('does not broadcast null local presence when requested', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + localPresence1.submit.bind(localPresence1, null), + function(next) { + presence2.subscribe(errorHandler(done)); + presence2.once('receive', function() { + done(new Error('should not have received presence')); + }); + next(); + } + ], done); + }); + + it('destroys its connection reference, unsubscribes and nulls its local presences', function(done) { + var localPresence1 = presence1.create('presence-1'); + var localPresence2 = presence2.create('presence-2'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + localPresence2.submit({index: 2}, errorHandler(done)); + presence1.once('receive', function() { + next(); + }); + }, + presence1.destroy.bind(presence1), + function(next) { + expect(presence1.localPresences).to.eql({}); + expect(presence2.remotePresences).to.eql({}); + expect(connection1._presences).to.eql({}); + next(); + } + ], done); + }); + + it('supports multiple local presences on a single connection', function(done) { + var localPresence1a = presence1.create('presence-1a'); + var localPresence1b = presence1.create('presence-1b'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1a.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1a'); + expect(presence).to.eql({index: 1}); + next(); + }); + }, + function(next) { + localPresence1b.submit({index: 2}, errorHandler(done)); + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1b'); + expect(presence).to.eql({index: 2}); + expect(Object.keys(presence1.localPresences)).to.eql(['presence-1a', 'presence-1b']); + expect(presence2.remotePresences).to.eql({ + 'presence-1a': {index: 1}, + 'presence-1b': {index: 2} + }); + next(); + }); + } + ], done); + }); + + it('subscribes once the connection can send', function(done) { + var localPresence1 = presence1.create('presence-1'); + + connection2._setState('disconnected'); + expect(connection2.canSend).to.be.false; + async.series([ + function(next) { + presence2.subscribe(next); + connection2._setState('connecting'); + connection2._setState('connected'); + }, + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + } + ], done); + }); + + it('sends local presence once the connection can send', function(done) { + var localPresence1 = presence1.create('presence-1'); + + connection1._setState('disconnected'); + expect(connection1.canSend).to.be.false; + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 1}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + connection1._setState('connecting'); + connection1._setState('connected'); + } + ], done); + }); + + it('re-requests remote presence when reconnecting', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + connection2._setState('disconnected'); + expect(connection2.canSend).to.be.false; + next(); + }, + localPresence1.submit.bind(localPresence1, {index: 1}), + function(next) { + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.eql({index: 1}); + next(); + }); + connection2._setState('connecting'); + connection2._setState('connected'); + } + ], done); + }); + + it('calls multiple callbacks if subscribing multiple times in series', function(done) { + var callbacksCalled = 0; + + var callback = function(error) { + if (error) return done(error); + callbacksCalled++; + if (callbacksCalled === 3) done(); + }; + + presence1.subscribe(callback); + presence1.subscribe(callback); + presence1.subscribe(callback); + }); + + it('finishes unsubscribed if calling immediately after subscribe', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + function(next) { + var callbackCount = 0; + var callback = function(error) { + if (error) return done(error); + callbackCount++; + if (callbackCount === 2) next(); + }; + + presence2.subscribe(callback); + presence2.unsubscribe(callback); + }, + function(next) { + expect(presence2.wantSubscribe).to.be.false; + expect(presence2.subscribed).to.be.false; + localPresence1.submit({index: 1}, next); + presence2.on('receive', function() { + done(new Error('Should not have received presence')); + }); + } + ], done); + }); + + it('throws an error when trying to create a presence with a non-string ID', function() { + expect(function() { + presence1.create(123); + }).to.throw(); + }); + + it('throws an error when trying to create a presence with an empty string ID', function() { + expect(function() { + presence1.create(''); + }).to.throw(); + }); + + it('returns the error if a local presence cannot be destroyed because of a bad submit', function(done) { + var localPresence1 = presence1.create('presence-1'); + sinon.stub(localPresence1, 'submit').callsFake(function(value, callback) { + callback(new Error('bad')); + }); + + localPresence1.destroy(function(error) { + expect(error).to.be.ok; + done(); + }); + }); + + it('throws an error if a presence is created with a non-string channel', function() { + expect(function() { + connection1.getPresence(123); + }).to.throw(); + }); + + it('throws an error if a presence is created with an empty string channel', function() { + expect(function() { + connection1.getPresence(''); + }).to.throw(); + }); + + it('returns unsubscribe errors when trying to destroy presence', function(done) { + sinon.stub(presence1, 'unsubscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.destroy(function(error) { + expect(error).to.be.ok; + done(); + }); + }); + + it('emits unsubscribe errors when trying to destroy presence', function(done) { + sinon.stub(presence1, 'unsubscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.once('error', function() { + done(); + }); + presence1.destroy(); + }); + + it('emits an error when trying to broadcast all presence with an error', function(done) { + presence1.once('error', function() { + done(); + }); + + presence1._broadcastAllLocalPresence(new Error('bad')); + }); + + it('emits a subscribe error on reconnection', function(done) { + async.series([ + presence1.subscribe.bind(presence1), + function(next) { + sinon.stub(presence1, 'subscribe').callsFake(function(callback) { + callback(new Error('bad')); + }); + + presence1.once('error', function() { + next(); + }); + + connection1._setState('disconnected'); + connection1._setState('connecting'); + connection1._setState('connected'); + } + ], done); + }); + + it('ignores presence that arrives out of order', function(done) { + var localPresence1 = presence1.create('presence-1'); + + async.series([ + presence2.subscribe.bind(presence2), + function(next) { + var hasPaused = false; + // Catch the first presence update, but then allow later ones + presencePauser.pause(function() { + if (hasPaused) return false; + hasPaused = true; + return true; + }); + + localPresence1.submit({index: 2}, next); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(presence).to.eql({index: 3}); + + presence2.once('receive', function() { + done(new Error('should not get another presence event')); + }); + + presencePauser.resume(); + next(); + }); + + localPresence1.submit({index: 3}, errorHandler(done)); + } + ], done); + }); + + it('adds itself back onto the connection after a destroy and a resubscribe', function(done) { + async.series([ + presence1.destroy.bind(presence1), + presence1.subscribe.bind(presence1), + function(next) { + expect(connection1._presences[presence1.channel]).to.equal(presence1); + next(); + } + ], done); + }); + + it('broadcasts a null presence when the connection is disconnected', function(done) { + var localPresence1 = presence1.create('presence-1'); + async.series([ + presence1.subscribe.bind(presence1), + presence2.subscribe.bind(presence2), + function(next) { + localPresence1.submit({index: 3}, errorHandler(done)); + presence2.once('receive', function() { + next(); + }); + }, + function(next) { + presence2.once('receive', function(id, presence) { + expect(id).to.equal('presence-1'); + expect(presence).to.be.null; + next(); + }); + connection1.close(); + } + ], done); + }); +}); diff --git a/test/ot.js b/test/ot.js index 379d017da..ed952d8ce 100644 --- a/test/ot.js +++ b/test/ot.js @@ -1,6 +1,10 @@ var expect = require('chai').expect; var ot = require('../lib/ot'); -var type = require('../lib/types').defaultType; +var types = require('../lib/types'); +var type = types.defaultType; +var presenceType = require('./client/presence/presence-test-type').type; + +types.register(presenceType); describe('ot', function() { describe('checkOp', function() { @@ -237,4 +241,132 @@ describe('ot', function() { expect(op).eql({}); }); }); + + describe('transformPresence', function() { + it('transforms a presence by an op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + op: {index: 2, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: {index: 8}, + t: presenceType.uri, + v: 2 + }); + }); + + it('nulls presence for a create op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + create: {type: presenceType.uri, data: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 2 + }); + }); + + it('nulls presence for a delete op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = {del: true}; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 2 + }); + }); + + it('returns an error for an invalid op', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = {}; + var error = ot.transformPresence(presence, op); + + expect(error.code).to.eql('ERR_OT_OP_BADLY_FORMED'); + }); + + it('considers isOwnOp', function() { + var presence = { + p: {index: 5}, + t: presenceType.uri, + v: 1 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op, true); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: {index: 8}, + t: presenceType.uri, + v: 2 + }); + }); + + it('checks that the type supports presence', function() { + var presence = { + p: {index: 5}, + t: type.uri, + v: 1 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error.code).to.eql('ERR_TYPE_DOES_NOT_SUPPORT_PRESENCE'); + }); + + it('leaves a null presence untransformed', function() { + var presence = { + p: null, + t: presenceType.uri, + v: 2 + }; + + var op = { + op: {index: 5, value: 'foo'} + }; + var error = ot.transformPresence(presence, op); + + expect(error).to.be.undefined; + expect(presence).to.eql({ + p: null, + t: presenceType.uri, + v: 3 + }); + }); + }); }); diff --git a/test/util.js b/test/util.js index 578c92a11..17fa6500f 100644 --- a/test/util.js +++ b/test/util.js @@ -45,3 +45,9 @@ exports.errorHandler = function(callback) { if (error) callback(error); }; }; + +exports.errorHandler = function(callback) { + return function(error) { + if (error) callback(error); + }; +}; From e2221657d469c97955deea887664fc55064dea2f Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Fri, 10 Jan 2020 10:42:26 +0000 Subject: [PATCH 2/9] Add presence example app --- examples/rich-text-presence/.gitignore | 1 + examples/rich-text-presence/README.md | 20 +++ examples/rich-text-presence/client.js | 142 ++++++++++++++++++ examples/rich-text-presence/package.json | 31 ++++ examples/rich-text-presence/rich-text.js | 20 +++ examples/rich-text-presence/server.js | 42 ++++++ examples/rich-text-presence/static/index.html | 15 ++ examples/rich-text-presence/static/style.css | 37 +++++ 8 files changed, 308 insertions(+) create mode 100644 examples/rich-text-presence/.gitignore create mode 100644 examples/rich-text-presence/README.md create mode 100644 examples/rich-text-presence/client.js create mode 100644 examples/rich-text-presence/package.json create mode 100644 examples/rich-text-presence/rich-text.js create mode 100644 examples/rich-text-presence/server.js create mode 100644 examples/rich-text-presence/static/index.html create mode 100644 examples/rich-text-presence/static/style.css diff --git a/examples/rich-text-presence/.gitignore b/examples/rich-text-presence/.gitignore new file mode 100644 index 000000000..eab5b2c0f --- /dev/null +++ b/examples/rich-text-presence/.gitignore @@ -0,0 +1 @@ +static/dist/ diff --git a/examples/rich-text-presence/README.md b/examples/rich-text-presence/README.md new file mode 100644 index 000000000..25fcf2f08 --- /dev/null +++ b/examples/rich-text-presence/README.md @@ -0,0 +1,20 @@ +# Collaborative Rich Text Editor with ShareDB + +This is a collaborative rich text editor using [Quill](https://github.com/quilljs/quill) and the [rich-text OT type](https://github.com/ottypes/rich-text). + +In this demo, data is not persisted. To persist data, run a Mongo +server and initialize ShareDB with the +[ShareDBMongo](https://github.com/share/sharedb-mongo) database adapter. + +## Install dependencies +``` +npm install +``` + +## Build JavaScript bundle and run server +``` +npm run build && npm start +``` + +## Run app in browser +Load [http://localhost:8080](http://localhost:8080) diff --git a/examples/rich-text-presence/client.js b/examples/rich-text-presence/client.js new file mode 100644 index 000000000..92abcd2f8 --- /dev/null +++ b/examples/rich-text-presence/client.js @@ -0,0 +1,142 @@ +var ReconnectingWebSocket = require('reconnecting-websocket'); +var sharedb = require('sharedb/lib/client'); +var richText = require('./rich-text'); +var Quill = require('quill'); +var QuillCursors = require('quill-cursors'); +var tinycolor = require('tinycolor2'); + +sharedb.types.register(richText.type); +Quill.register('modules/cursors', QuillCursors); + +var clients = []; +var colors = {}; + +var collection = 'examples'; +var id = 'richtext'; + +var editorContainer = document.querySelector('.editor-container'); +document.querySelector('#add-client').addEventListener('click', function() { + addClient(); +}); + +addClient(); +addClient(); + +function addClient() { + var socket = new ReconnectingWebSocket('ws://' + window.location.host); + var connection = new sharedb.Connection(socket); + var doc = connection.get(collection, id); + doc.subscribe(function(err) { + if (err) throw err; + var quill = initialiseQuill(doc); + var color = '#' + tinycolor.random().toHex(); + var id = 'client-' + (clients.length + 1); + colors[id] = color; + + clients.push({ + quill: quill, + doc: doc, + color: color + }); + + document.querySelector('#' + id + ' h1').style.color = color; + }); +} + +function initialiseQuill(doc) { + var quill = new Quill(quillContainer(), { + theme: 'bubble', + modules: { + cursors: true + } + }); + var cursors = quill.getModule('cursors'); + var index = clients.length; + + quill.setContents(doc.data); + + quill.on('text-change', function(delta, oldDelta, source) { + if (source !== 'user') return; + doc.submitOp(delta); + }); + + doc.on('op', function(op, source) { + if (source) return; + quill.updateContents(op); + }); + + var presence = doc.connection.getDocPresence(collection, id); + presence.subscribe(function(error) { + if (error) throw error; + }); + var localPresence = presence.create('client-' + (index + 1)); + + quill.on('selection-change', function(range) { + // Ignore blurring, so that we can see lots of users in the + // same window + if (!range) return; + localPresence.submit(range, function(error) { + if (error) throw error; + }); + }); + + presence.on('receive', function(id, range) { + cursors.createCursor(id, id, colors[id]); + cursors.moveCursor(id, range); + }); + + return quill; +} + +function quillContainer() { + var wrapper = document.createElement('div'); + wrapper.classList.add('editor'); + var index = clients.length; + wrapper.id = 'client-' + (index + 1); + + wrapper.innerHTML = + '

Client' + (index + 1) + '

' + + ' ' + + ' ' + + '
'; + + wrapper.querySelector('.remove-client').addEventListener('click', function() { + removeClient(clients[index]); + }); + + var connectionButton = wrapper.querySelector('.client-connection'); + connectionButton.addEventListener('click', function() { + toggleConnection(connectionButton, clients[index]); + }); + + editorContainer.appendChild(wrapper); + return wrapper.querySelector('.quill'); +} + +function toggleConnection(button, client) { + if (button.classList.contains('connected')) { + button.classList.remove('connected'); + button.textContent = 'Connect'; + disconnectClient(client); + } else { + button.classList.add('connected'); + button.textContent = 'Disconnect'; + connectClient(client); + } +} + +function disconnectClient(client) { + client.doc.connection.close(); +} + +function connectClient(client) { + var socket = new ReconnectingWebSocket('ws://' + window.location.host); + client.doc.connection.bindToSocket(socket); +} + +function removeClient(client) { + client.quill.root.parentElement.parentElement.remove(); + client.doc.destroy(function(error) { + if (error) throw error; + }); +} diff --git a/examples/rich-text-presence/package.json b/examples/rich-text-presence/package.json new file mode 100644 index 000000000..ee8636284 --- /dev/null +++ b/examples/rich-text-presence/package.json @@ -0,0 +1,31 @@ +{ + "name": "sharedb-example-rich-text-presence", + "version": "1.0.0", + "description": "An example of presence using ShareDB and Quill", + "main": "server.js", + "scripts": { + "build": "mkdir -p static/dist/ && ./node_modules/.bin/browserify client.js -o static/dist/bundle.js", + "test": "echo \"Error: no test specified\" && exit 1", + "start": "node server.js" + }, + "author": "Nate Smith", + "contributors": [ + "Avital Oliver (https://aoliver.org/)", + "Alec Gibson " + ], + "license": "MIT", + "dependencies": { + "@teamwork/websocket-json-stream": "^2.0.0", + "express": "^4.17.1", + "quill": "^1.3.7", + "quill-cursors": "^2.2.1", + "reconnecting-websocket": "^4.2.0", + "rich-text": "^4.0.0", + "sharedb": "file:../../", + "tinycolor2": "^1.4.1", + "ws": "^7.2.0" + }, + "devDependencies": { + "browserify": "^16.5.0" + } +} diff --git a/examples/rich-text-presence/rich-text.js b/examples/rich-text-presence/rich-text.js new file mode 100644 index 000000000..653ae0592 --- /dev/null +++ b/examples/rich-text-presence/rich-text.js @@ -0,0 +1,20 @@ +var richText = require('rich-text'); + +richText.type.transformPresence = function(presence, op, isOwnOp) { + if (!presence) { + return null; + } + + var start = presence.index; + var end = presence.index + presence.length; + var delta = new richText.Delta(op); + start = delta.transformPosition(start, !isOwnOp); + end = delta.transformPosition(end, !isOwnOp); + + return Object.assign({}, presence, { + index: start, + length: end - start + }); +}; + +module.exports = richText; diff --git a/examples/rich-text-presence/server.js b/examples/rich-text-presence/server.js new file mode 100644 index 000000000..4ecd37540 --- /dev/null +++ b/examples/rich-text-presence/server.js @@ -0,0 +1,42 @@ +var http = require('http'); +var express = require('express'); +var ShareDB = require('sharedb'); +var richText = require('./rich-text'); +var WebSocket = require('ws'); +var WebSocketJSONStream = require('@teamwork/websocket-json-stream'); + +ShareDB.types.register(richText.type); +var backend = new ShareDB(); +createDoc(startServer); + +// Create initial document then fire callback +function createDoc(callback) { + var connection = backend.connect(); + var doc = connection.get('examples', 'richtext'); + doc.fetch(function(err) { + if (err) throw err; + if (doc.type === null) { + doc.create([{insert: 'Hi!'}], 'rich-text', callback); + return; + } + callback(); + }); +} + +function startServer() { + // Create a web server to serve files and listen to WebSocket connections + var app = express(); + app.use(express.static('static')); + app.use(express.static('node_modules/quill/dist')); + var server = http.createServer(app); + + // Connect any incoming WebSocket connection to ShareDB + var wss = new WebSocket.Server({server: server}); + wss.on('connection', function(ws) { + var stream = new WebSocketJSONStream(ws); + backend.listen(stream); + }); + + server.listen(8080); + console.log('Listening on http://localhost:8080'); +} diff --git a/examples/rich-text-presence/static/index.html b/examples/rich-text-presence/static/index.html new file mode 100644 index 000000000..4f9a7a43c --- /dev/null +++ b/examples/rich-text-presence/static/index.html @@ -0,0 +1,15 @@ + + +ShareDB Rich Text + + + + +
+ +
+
+ + diff --git a/examples/rich-text-presence/static/style.css b/examples/rich-text-presence/static/style.css new file mode 100644 index 000000000..e4261c67c --- /dev/null +++ b/examples/rich-text-presence/static/style.css @@ -0,0 +1,37 @@ +h1 { + margin: 0 10px; + display: inline-block; +} + +button { + font-size: 20px; +} + +.controls { + width: 100%; + text-align: center; + margin: 20px; +} + +.editor-container { + width: 100%; +} + +.editor { + display: inline-block; + width: 50%; + margin-bottom: 20px; +} + +.ql-container { + padding: 10px; +} + +.ql-editor { + /* TODO: Colour code with cursor? */ + border: 1px solid grey; +} + +.ql-tooltip { + display: none; +} From 61f96b23d55a974bee2796e5283c92a4ba40c482 Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Wed, 15 Jan 2020 11:11:35 +0000 Subject: [PATCH 3/9] Review markups - remove `this` bindings in favour for local variables - track `seq` high-water marks on a per-channel basis - destroy unused streams when a subscription request is ignored - fix some prototype definitions --- lib/agent.js | 60 +++++++++++++--------- lib/client/connection.js | 10 ++-- lib/client/presence/local-doc-presence.js | 34 ++++++------ lib/client/presence/local-presence.js | 12 +++-- lib/client/presence/presence.js | 37 +++++++------ lib/client/presence/remote-doc-presence.js | 23 +++++---- 6 files changed, 98 insertions(+), 78 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 9dab4280f..b50cc2cc2 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -40,8 +40,8 @@ function Agent(backend, stream) { this.subscribedPresences = {}; // Highest seq received for a subscription request. Any seq lower than this // value is stale, and should be ignored. Used for keeping the subscription - // state in sync with the client's desired state - this.presenceSubscriptionSeq = 0; + // state in sync with the client's desired state. Map of channel -> seq + this.presenceSubscriptionSeq = {}; // Keep track of the last request that has been sent by each local presence // belonging to this agent. This is used to generate a new disconnection // request if the client disconnects ungracefully. This is a @@ -141,27 +141,30 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { Agent.prototype._subscribeToPresenceStream = function(channel, stream) { if (this.closed) return stream.destroy(); + var agent = this; stream.on('data', function(data) { if (data.error) { logger.error('Presence subscription stream error', channel, data.error); } - this._handlePresenceData(data); - }.bind(this)); + agent._handlePresenceData(data); + }); stream.on('end', function() { - var requests = this.presenceRequests[channel] || {}; + var requests = agent.presenceRequests[channel] || {}; for (var id in requests) { - var request = this.presenceRequests[channel][id]; + var request = agent.presenceRequests[channel][id]; request.seq++; request.p = null; - this._broadcastPresence(request, function(error) { + agent._broadcastPresence(request, function(error) { if (error) logger.error('Error broadcasting disconnect presence', channel, error); }); } - delete this.subscribedPresences[channel]; - delete this.presenceRequests[channel]; - }.bind(this)); + if (agent.subscribedPresences[channel] === stream) { + delete agent.subscribedPresences[channel]; + } + delete agent.presenceRequests[channel]; + }); }; Agent.prototype._subscribeToQuery = function(emitter, queryId, collection, query) { @@ -730,6 +733,7 @@ Agent.prototype._src = function() { }; Agent.prototype._broadcastPresence = function(presence, callback) { + var agent = this; var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {}); var previousRequest = requests[presence.id]; if (!previousRequest || previousRequest.seq < presence.seq) { @@ -737,12 +741,12 @@ Agent.prototype._broadcastPresence = function(presence, callback) { } this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) { if (error) return callback(error); - var channel = this._getPresenceChannel(presence.ch); - this.backend.pubsub.publish([channel], presence, function(error) { + var channel = agent._getPresenceChannel(presence.ch); + agent.backend.pubsub.publish([channel], presence, function(error) { if (error) return callback(error); callback(null, presence); }); - }.bind(this)); + }); }; Agent.prototype._createPresence = function(request) { @@ -751,8 +755,9 @@ Agent.prototype._createPresence = function(request) { ch: request.ch, src: this.clientId, seq: request.seq, - id: request.id, + id: request.id, // Presence ID, not Doc ID (which is 'd') p: request.p, + // The c,d,v,t fields are only set for DocPresence c: request.c, d: request.d, v: request.v, @@ -761,22 +766,26 @@ Agent.prototype._createPresence = function(request) { }; Agent.prototype._subscribePresence = function(channel, seq, callback) { + var agent = this; var presenceChannel = this._getPresenceChannel(channel); this.backend.pubsub.subscribe(presenceChannel, function(error, stream) { if (error) return callback(error); - if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq}); - this.presenceSubscriptionSeq = seq; - this.subscribedPresences[channel] = stream; - this._subscribeToPresenceStream(channel, stream); - this._requestPresence(channel, function(error) { + if (seq < agent.presenceSubscriptionSeq[channel]) { + stream.destroy(); + return callback(null, {ch: channel, seq: seq}); + } + agent.presenceSubscriptionSeq[channel] = seq; + agent.subscribedPresences[channel] = stream; + agent._subscribeToPresenceStream(channel, stream); + agent._requestPresence(channel, function(error) { callback(error, {ch: channel, seq: seq}); }); - }.bind(this)); + }); }; Agent.prototype._unsubscribePresence = function(channel, seq, callback) { - if (seq < this.presenceSubscriptionSeq) return; - this.presenceSubscriptionSeq = seq; + if (seq < this.presenceSubscriptionSeq[channel]) return; + this.presenceSubscriptionSeq[channel] = seq; var stream = this.subscribedPresences[channel]; if (stream) stream.destroy(); callback(null, {ch: channel, seq: seq}); @@ -801,12 +810,13 @@ Agent.prototype._handlePresenceData = function(presence) { collection: presence.c, presence: presence }; + var agent = this; backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) { if (error) { - return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); + return agent.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)}); } - this.send(presence); - }.bind(this)); + agent.send(presence); + }); }; function createClientOp(request, clientId) { diff --git a/lib/client/connection.js b/lib/client/connection.js index 5099ed92b..c05419d2f 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -742,16 +742,18 @@ Connection.prototype._initialize = function(message) { }; Connection.prototype.getPresence = function(channel) { + var connection = this; return util.digOrCreate(this._presences, channel, function() { - return new Presence(this, channel); - }.bind(this)); + return new Presence(connection, channel); + }); }; Connection.prototype.getDocPresence = function(collection, id) { var channel = DocPresence.channel(collection, id); + var connection = this; return util.digOrCreate(this._presences, channel, function() { - return new DocPresence(this, collection, id); - }.bind(this)); + return new DocPresence(connection, collection, id); + }); }; Connection.prototype._sendPresenceAction = function(action, seq, presence) { diff --git a/lib/client/presence/local-doc-presence.js b/lib/client/presence/local-doc-presence.js index 0563f016d..10ac9a32f 100644 --- a/lib/client/presence/local-doc-presence.js +++ b/lib/client/presence/local-doc-presence.js @@ -47,21 +47,22 @@ LocalDocPresence.prototype.destroy = function(callback) { LocalDocPresence.prototype._sendPending = function() { if (this._isSending) return; this._isSending = true; + var presence = this; this._doc.whenNothingPending(function() { - this._isSending = false; - if (!this.connection.canSend) return; + presence._isSending = false; + if (!presence.connection.canSend) return; - this._pendingMessages.forEach(function(message) { - message.t = this._doc.type.uri; - message.v = this._doc.version; - this.connection.send(message); - }.bind(this)); + presence._pendingMessages.forEach(function(message) { + message.t = presence._doc.type.uri; + message.v = presence._doc.version; + presence.connection.send(message); + }); - this._pendingMessages = []; - }.bind(this)); + presence._pendingMessages = []; + }); }; -LocalPresence.prototype._registerWithDoc = function() { +LocalDocPresence.prototype._registerWithDoc = function() { this._doc.on('op', this._opHandler); this._doc.on('create', this._createOrDelHandler); this._doc.on('del', this._createOrDelHandler); @@ -70,14 +71,15 @@ LocalPresence.prototype._registerWithDoc = function() { }; LocalDocPresence.prototype._transformAgainstOp = function(op, source) { + var presence = this; this._pendingMessages.forEach(function(message) { try { - message.p = this._doc.type.transformPresence(message.p, op, source); + message.p = presence._doc.type.transformPresence(message.p, op, source); } catch (error) { - var callback = this._getCallback(message.seq); - this._callbackOrEmit(error, callback); + var callback = presence._getCallback(message.seq); + presence._callbackOrEmit(error, callback); } - }.bind(this)); + }); try { this.value = this._doc.type.transformPresence(this.value, op, source); @@ -86,7 +88,7 @@ LocalDocPresence.prototype._transformAgainstOp = function(op, source) { } }; -LocalPresence.prototype._handleCreateOrDel = function() { +LocalDocPresence.prototype._handleCreateOrDel = function() { this._pendingMessages.forEach(function(message) { message.p = null; }); @@ -94,7 +96,7 @@ LocalPresence.prototype._handleCreateOrDel = function() { this.value = null; }; -LocalPresence.prototype._handleLoad = function() { +LocalDocPresence.prototype._handleLoad = function() { this.value = null; this._pendingMessages = []; }; diff --git a/lib/client/presence/local-presence.js b/lib/client/presence/local-presence.js index c49ef4a5c..ba2763909 100644 --- a/lib/client/presence/local-presence.js +++ b/lib/client/presence/local-presence.js @@ -32,18 +32,20 @@ LocalPresence.prototype.send = function(callback) { }; LocalPresence.prototype.destroy = function(callback) { + var presence = this; this.submit(null, function(error) { - if (error) return this._callbackOrEmit(error, callback); - delete this.presence.localPresences[this.presenceId]; + if (error) return presence._callbackOrEmit(error, callback); + delete presence.presence.localPresences[presence.presenceId]; if (callback) callback(); - }.bind(this)); + }); }; LocalPresence.prototype._sendPending = function() { if (!this.connection.canSend) return; + var presence = this; this._pendingMessages.forEach(function(message) { - this.connection.send(message); - }.bind(this)); + presence.connection.send(message); + }); this._pendingMessages = []; }; diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js index bc0991261..dfa0bf93b 100644 --- a/lib/client/presence/presence.js +++ b/lib/client/presence/presence.js @@ -41,29 +41,30 @@ Presence.prototype.create = function(id) { }; Presence.prototype.destroy = function(callback) { + var presence = this; this.unsubscribe(function(error) { - if (error) return this._callbackOrEmit(error, callback); - var localIds = Object.keys(this.localPresences); - var remoteIds = Object.keys(this._remotePresenceInstances); + if (error) return presence._callbackOrEmit(error, callback); + var localIds = Object.keys(presence.localPresences); + var remoteIds = Object.keys(presence._remotePresenceInstances); async.parallel( [ function(next) { async.each(localIds, function(presenceId, next) { - this.localPresences[presenceId].destroy(next); - }.bind(this), next); - }.bind(this), + presence.localPresences[presenceId].destroy(next); + }, next); + }, function(next) { async.each(remoteIds, function(presenceId, next) { - this._remotePresenceInstances[presenceId].destroy(next); - }.bind(this), next); - }.bind(this) + presence._remotePresenceInstances[presenceId].destroy(next); + }, next); + } ], function(error) { - delete this.connection._presences[this.channel]; - this._callbackOrEmit(error, callback); - }.bind(this) + delete presence.connection._presences[presence.channel]; + presence._callbackOrEmit(error, callback); + } ); - }.bind(this)); + }); }; Presence.prototype._sendSubscriptionAction = function(wantSubscribe, callback) { @@ -93,9 +94,10 @@ Presence.prototype._receiveUpdate = function(error, message) { if (localPresence) return localPresence._ack(error, message.seq); if (error) return this.emit('error', error); + var presence = this; var remotePresence = util.digOrCreate(this._remotePresenceInstances, message.id, function() { - return this._createRemotePresence(message.id); - }.bind(this)); + return presence._createRemotePresence(message.id); + }); remotePresence.receiveUpdate(message); }; @@ -137,9 +139,10 @@ Presence.prototype._resubscribe = function() { if (!this.wantSubscribe) return this._callEachOrEmit(null, callbacks); + var presence = this; this.subscribe(function(error) { - this._callEachOrEmit(error, callbacks); - }.bind(this)); + presence._callEachOrEmit(error, callbacks); + }); }; Presence.prototype._subscriptionCallback = function(seq) { diff --git a/lib/client/presence/remote-doc-presence.js b/lib/client/presence/remote-doc-presence.js index 07b5cc515..b0bd8b9e4 100644 --- a/lib/client/presence/remote-doc-presence.js +++ b/lib/client/presence/remote-doc-presence.js @@ -48,22 +48,23 @@ RemoteDocPresence.prototype._registerWithDoc = function() { RemoteDocPresence.prototype._setPendingPresence = function() { if (this._pendingSetPending) return; this._pendingSetPending = true; + var presence = this; this._doc.whenNothingPending(function() { - this._pendingSetPending = false; - if (!this._pending) return; - if (this._pending.seq < this.seq) return this._pending = null; + presence._pendingSetPending = false; + if (!presence._pending) return; + if (presence._pending.seq < presence.seq) return presence._pending = null; - if (this._pending.v > this._doc.version) { - return this._doc.fetch(); + if (presence._pending.v > presence._doc.version) { + return presence._doc.fetch(); } - if (!this._catchUpStalePresence()) return; + if (!presence._catchUpStalePresence()) return; - this.value = this._pending.p; - this.seq = this._pending.seq; - this._pending = null; - this.presence._updateRemotePresence(this); - }.bind(this)); + presence.value = presence._pending.p; + presence.seq = presence._pending.seq; + presence._pending = null; + presence.presence._updateRemotePresence(presence); + }); }; RemoteDocPresence.prototype._handleOp = function(op, source, connectionId) { From b3fee395eab54cd177c385386d30db7c47a2edeb Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Wed, 15 Jan 2020 11:45:13 +0000 Subject: [PATCH 4/9] Simplify presence example At the moment, the presence example is a little bit complicated, because it handles adding multiple clients to the same window. This also breaks its behaviour, because presence IDs are based on an array index, which isn't unique when opening a second window. This change just has a single client in the window, which has the advantage of simplifying the code. In order to test multiple users, simply open multiple windows. --- examples/rich-text-presence/client.js | 109 ++++++------------ examples/rich-text-presence/package.json | 1 + examples/rich-text-presence/static/index.html | 12 +- examples/rich-text-presence/static/style.css | 22 +--- 4 files changed, 49 insertions(+), 95 deletions(-) diff --git a/examples/rich-text-presence/client.js b/examples/rich-text-presence/client.js index 92abcd2f8..ee4a44427 100644 --- a/examples/rich-text-presence/client.js +++ b/examples/rich-text-presence/client.js @@ -4,54 +4,39 @@ var richText = require('./rich-text'); var Quill = require('quill'); var QuillCursors = require('quill-cursors'); var tinycolor = require('tinycolor2'); +var ObjectID = require('bson-objectid'); sharedb.types.register(richText.type); Quill.register('modules/cursors', QuillCursors); -var clients = []; +var connectionButton = document.getElementById('client-connection'); +connectionButton.addEventListener('click', function() { + toggleConnection(connectionButton); +}); + +var nameInput = document.getElementById('name'); + var colors = {}; var collection = 'examples'; var id = 'richtext'; +var presenceId = new ObjectID().toString(); -var editorContainer = document.querySelector('.editor-container'); -document.querySelector('#add-client').addEventListener('click', function() { - addClient(); -}); - -addClient(); -addClient(); - -function addClient() { - var socket = new ReconnectingWebSocket('ws://' + window.location.host); - var connection = new sharedb.Connection(socket); - var doc = connection.get(collection, id); - doc.subscribe(function(err) { - if (err) throw err; - var quill = initialiseQuill(doc); - var color = '#' + tinycolor.random().toHex(); - var id = 'client-' + (clients.length + 1); - colors[id] = color; - - clients.push({ - quill: quill, - doc: doc, - color: color - }); +var socket = new ReconnectingWebSocket('ws://' + window.location.host); +var connection = new sharedb.Connection(socket); +var doc = connection.get(collection, id); - document.querySelector('#' + id + ' h1').style.color = color; - }); -} +doc.subscribe(function(err) { + if (err) throw err; + initialiseQuill(doc); +}); function initialiseQuill(doc) { - var quill = new Quill(quillContainer(), { + var quill = new Quill('#editor', { theme: 'bubble', - modules: { - cursors: true - } + modules: {cursors: true} }); var cursors = quill.getModule('cursors'); - var index = clients.length; quill.setContents(doc.data); @@ -69,74 +54,48 @@ function initialiseQuill(doc) { presence.subscribe(function(error) { if (error) throw error; }); - var localPresence = presence.create('client-' + (index + 1)); + var localPresence = presence.create(presenceId); quill.on('selection-change', function(range) { // Ignore blurring, so that we can see lots of users in the - // same window + // same window. In real use, you may want to clear the cursor. if (!range) return; + // In this particular instance, we can send extra information + // on the presence object. This ability will vary depending on + // type. + range.name = nameInput.value; localPresence.submit(range, function(error) { if (error) throw error; }); }); presence.on('receive', function(id, range) { - cursors.createCursor(id, id, colors[id]); + colors[id] = colors[id] || tinycolor.random().toHexString(); + var name = (range && range.name) || 'Anonymous'; + cursors.createCursor(id, name, colors[id]); cursors.moveCursor(id, range); }); return quill; } -function quillContainer() { - var wrapper = document.createElement('div'); - wrapper.classList.add('editor'); - var index = clients.length; - wrapper.id = 'client-' + (index + 1); - - wrapper.innerHTML = - '

Client' + (index + 1) + '

' + - ' ' + - ' ' + - '
'; - - wrapper.querySelector('.remove-client').addEventListener('click', function() { - removeClient(clients[index]); - }); - - var connectionButton = wrapper.querySelector('.client-connection'); - connectionButton.addEventListener('click', function() { - toggleConnection(connectionButton, clients[index]); - }); - - editorContainer.appendChild(wrapper); - return wrapper.querySelector('.quill'); -} - -function toggleConnection(button, client) { +function toggleConnection(button) { if (button.classList.contains('connected')) { button.classList.remove('connected'); button.textContent = 'Connect'; - disconnectClient(client); + disconnect(); } else { button.classList.add('connected'); button.textContent = 'Disconnect'; - connectClient(client); + connect(); } } -function disconnectClient(client) { - client.doc.connection.close(); +function disconnect() { + doc.connection.close(); } -function connectClient(client) { +function connect() { var socket = new ReconnectingWebSocket('ws://' + window.location.host); - client.doc.connection.bindToSocket(socket); -} - -function removeClient(client) { - client.quill.root.parentElement.parentElement.remove(); - client.doc.destroy(function(error) { - if (error) throw error; - }); + doc.connection.bindToSocket(socket); } diff --git a/examples/rich-text-presence/package.json b/examples/rich-text-presence/package.json index ee8636284..6747a9a32 100644 --- a/examples/rich-text-presence/package.json +++ b/examples/rich-text-presence/package.json @@ -16,6 +16,7 @@ "license": "MIT", "dependencies": { "@teamwork/websocket-json-stream": "^2.0.0", + "bson-objectid": "^1.3.0", "express": "^4.17.1", "quill": "^1.3.7", "quill-cursors": "^2.2.1", diff --git a/examples/rich-text-presence/static/index.html b/examples/rich-text-presence/static/index.html index 4f9a7a43c..01f27be0f 100644 --- a/examples/rich-text-presence/static/index.html +++ b/examples/rich-text-presence/static/index.html @@ -6,10 +6,14 @@
- + +
-
+ +
+ Open a new window to see another client! +
+ +
diff --git a/examples/rich-text-presence/static/style.css b/examples/rich-text-presence/static/style.css index e4261c67c..fb9858eb2 100644 --- a/examples/rich-text-presence/static/style.css +++ b/examples/rich-text-presence/static/style.css @@ -1,10 +1,10 @@ -h1 { - margin: 0 10px; - display: inline-block; +body { + font-family: sans-serif; } -button { - font-size: 20px; +input, button { + font-size: 16px; + margin-right: 10px; } .controls { @@ -13,25 +13,15 @@ button { margin: 20px; } -.editor-container { - width: 100%; -} - -.editor { - display: inline-block; - width: 50%; - margin-bottom: 20px; -} - .ql-container { padding: 10px; } .ql-editor { - /* TODO: Colour code with cursor? */ border: 1px solid grey; } +/* Keep the example simple by hiding the toolbar */ .ql-tooltip { display: none; } From 6216ad12d9ef43436323f784457f42267c7171a1 Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Thu, 30 Jan 2020 14:25:57 +0000 Subject: [PATCH 5/9] Use a presence version instead of `seq` At the moment, we use `seq` as a proxy for "presence version". However, this is problematic, because the `seq` is currently reset every time a client reconnects, and we shouldn't really be checking against it. This change does away with this usage and simply introduces a notion `pv` presenve version field onto the presence message, which can be used to reliably check the version of a presence with a given ID (assuming a client's ID is truly unique to that session). --- lib/agent.js | 4 ++-- lib/client/presence/local-doc-presence.js | 3 +-- lib/client/presence/local-presence.js | 17 +++++++++-------- lib/client/presence/presence.js | 2 +- lib/client/presence/remote-doc-presence.js | 8 ++++---- lib/client/presence/remote-presence.js | 6 +++--- test/client/presence/doc-presence.js | 4 ++-- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index b50cc2cc2..12b44c663 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -736,7 +736,7 @@ Agent.prototype._broadcastPresence = function(presence, callback) { var agent = this; var requests = this.presenceRequests[presence.ch] || (this.presenceRequests[presence.ch] = {}); var previousRequest = requests[presence.id]; - if (!previousRequest || previousRequest.seq < presence.seq) { + if (!previousRequest || previousRequest.pv < presence.pv) { this.presenceRequests[presence.ch][presence.id] = presence; } this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) { @@ -754,9 +754,9 @@ Agent.prototype._createPresence = function(request) { a: 'p', ch: request.ch, src: this.clientId, - seq: request.seq, id: request.id, // Presence ID, not Doc ID (which is 'd') p: request.p, + pv: request.pv, // The c,d,v,t fields are only set for DocPresence c: request.c, d: request.d, diff --git a/lib/client/presence/local-doc-presence.js b/lib/client/presence/local-doc-presence.js index 10ac9a32f..fc525b0d6 100644 --- a/lib/client/presence/local-doc-presence.js +++ b/lib/client/presence/local-doc-presence.js @@ -10,7 +10,6 @@ function LocalDocPresence(presence, presenceId) { this.id = this.presence.id; this._doc = this.connection.get(this.collection, this.id); - this._seq = null; this._isSending = false; this._opHandler = this._transformAgainstOp.bind(this); @@ -76,7 +75,7 @@ LocalDocPresence.prototype._transformAgainstOp = function(op, source) { try { message.p = presence._doc.type.transformPresence(message.p, op, source); } catch (error) { - var callback = presence._getCallback(message.seq); + var callback = presence._getCallback(message.pv); presence._callbackOrEmit(error, callback); } }); diff --git a/lib/client/presence/local-presence.js b/lib/client/presence/local-presence.js index ba2763909..9ea764b42 100644 --- a/lib/client/presence/local-presence.js +++ b/lib/client/presence/local-presence.js @@ -11,11 +11,12 @@ function LocalPresence(presence, presenceId) { this.presence = presence; this.presenceId = presenceId; this.connection = presence.connection; + this.presenceVersion = 0; this.value = null; this._pendingMessages = []; - this._callbacksBySeq = {}; + this._callbacksByPresenceVersion = {}; } emitter.mixin(LocalPresence); @@ -27,7 +28,7 @@ LocalPresence.prototype.submit = function(value, callback) { LocalPresence.prototype.send = function(callback) { var message = this._message(); this._pendingMessages.push(message); - this._callbacksBySeq[message.seq] = callback; + this._callbacksByPresenceVersion[message.pv] = callback; this._sendPending(); }; @@ -50,8 +51,8 @@ LocalPresence.prototype._sendPending = function() { this._pendingMessages = []; }; -LocalPresence.prototype._ack = function(error, seq) { - var callback = this._getCallback(seq); +LocalPresence.prototype._ack = function(error, presenceVersion) { + var callback = this._getCallback(presenceVersion); this._callbackOrEmit(error, callback); }; @@ -61,13 +62,13 @@ LocalPresence.prototype._message = function() { ch: this.presence.channel, id: this.presenceId, p: this.value, - seq: this.connection.seq++ + pv: this.presenceVersion++ }; }; -LocalPresence.prototype._getCallback = function(seq) { - var callback = this._callbacksBySeq[seq]; - delete this._callbacksBySeq[seq]; +LocalPresence.prototype._getCallback = function(presenceVersion) { + var callback = this._callbacksByPresenceVersion[presenceVersion]; + delete this._callbacksByPresenceVersion[presenceVersion]; return callback; }; diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js index dfa0bf93b..f446fadad 100644 --- a/lib/client/presence/presence.js +++ b/lib/client/presence/presence.js @@ -91,7 +91,7 @@ Presence.prototype._handleUnsubscribe = function(error, seq) { Presence.prototype._receiveUpdate = function(error, message) { var localPresence = util.dig(this.localPresences, message.id); - if (localPresence) return localPresence._ack(error, message.seq); + if (localPresence) return localPresence._ack(error, message.pv); if (error) return this.emit('error', error); var presence = this; diff --git a/lib/client/presence/remote-doc-presence.js b/lib/client/presence/remote-doc-presence.js index b0bd8b9e4..2fef91ffa 100644 --- a/lib/client/presence/remote-doc-presence.js +++ b/lib/client/presence/remote-doc-presence.js @@ -8,6 +8,7 @@ function RemoteDocPresence(presence, presenceId) { this.collection = this.presence.collection; this.id = this.presence.id; this.src = null; + this.presenceVersion = null; this._doc = this.connection.get(this.collection, this.id); this._pending = null; @@ -23,7 +24,7 @@ function RemoteDocPresence(presence, presenceId) { RemoteDocPresence.prototype = Object.create(RemotePresence.prototype); RemoteDocPresence.prototype.receiveUpdate = function(message) { - if (this._pending && message.seq < this._pending.seq) return; + if (this._pending && message.pv < this._pending.pv) return; this.src = message.src; this._pending = message; this._setPendingPresence(); @@ -52,7 +53,7 @@ RemoteDocPresence.prototype._setPendingPresence = function() { this._doc.whenNothingPending(function() { presence._pendingSetPending = false; if (!presence._pending) return; - if (presence._pending.seq < presence.seq) return presence._pending = null; + if (presence._pending.pv < presence.presenceVersion) return presence._pending = null; if (presence._pending.v > presence._doc.version) { return presence._doc.fetch(); @@ -61,7 +62,7 @@ RemoteDocPresence.prototype._setPendingPresence = function() { if (!presence._catchUpStalePresence()) return; presence.value = presence._pending.p; - presence.seq = presence._pending.seq; + presence.presenceVersion = presence._pending.pv; presence._pending = null; presence.presence._updateRemotePresence(presence); }); @@ -81,7 +82,6 @@ RemotePresence.prototype._handleCreateDel = function() { RemotePresence.prototype._handleLoad = function() { this.value = null; - this._callbacksBySeq = {}; this._pending = null; this._opCache = null; this.presence._updateRemotePresence(this); diff --git a/lib/client/presence/remote-presence.js b/lib/client/presence/remote-presence.js index 280392974..6a2188782 100644 --- a/lib/client/presence/remote-presence.js +++ b/lib/client/presence/remote-presence.js @@ -5,13 +5,13 @@ function RemotePresence(presence, presenceId) { this.connection = this.presence.connection; this.value = null; - this.seq = 0; + this.presenceVersion = 0; } RemotePresence.prototype.receiveUpdate = function(message) { - if (message.seq < this.seq) return; + if (message.pv < this.presenceVersion) return; this.value = message.p; - this.seq = message.seq; + this.presenceVersion = message.pv; this.presence._updateRemotePresence(this); }; diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js index 4afc9a8e5..7f611ab2f 100644 --- a/test/client/presence/doc-presence.js +++ b/test/client/presence/doc-presence.js @@ -519,8 +519,8 @@ describe('DocPresence', function() { if (pauseCount === 2) { expect(this._pendingBroadcasts[0][0].presence.p).to.eql({index: 2}); expect(this._pendingBroadcasts[1][0].presence.p).to.eql({index: 4}); - expect(this._pendingBroadcasts[0][0].presence.seq) - .to.be.lessThan(this._pendingBroadcasts[1][0].presence.seq); + expect(this._pendingBroadcasts[0][0].presence.pv) + .to.be.lessThan(this._pendingBroadcasts[1][0].presence.pv); // Fire the broadcasts in the reverse order this._pendingBroadcasts[1][1](); From 8635b072825a5fb17580735e27562b29e1c4965c Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Thu, 30 Jan 2020 14:27:57 +0000 Subject: [PATCH 6/9] Add truthiness check for handling presence request --- lib/client/connection.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client/connection.js b/lib/client/connection.js index c05419d2f..81cb28149 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -787,5 +787,5 @@ Connection.prototype._handlePresence = function(error, message) { Connection.prototype._handlePresenceRequest = function(error, message) { var presence = util.dig(this._presences, message.ch); - presence._broadcastAllLocalPresence(error, message); + if (presence) presence._broadcastAllLocalPresence(error, message); }; From 55dcbe46962a6e921f454f22227233e42e086508 Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Wed, 19 Feb 2020 17:50:12 +0000 Subject: [PATCH 7/9] Use `Agent._src` getter for presence `src` This change updates the Presence `src` field to use the Agent's `_src` getter, which will try to use a client-proposed ID, and then fall back to the server-generated `clientId`. This means that the `src` should be consistent across client reconnections, which helps `RemotePresence` to determine which ops have come from which presence providers. --- lib/agent.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 12b44c663..c28cd0641 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -753,7 +753,7 @@ Agent.prototype._createPresence = function(request) { return { a: 'p', ch: request.ch, - src: this.clientId, + src: this._src(), id: request.id, // Presence ID, not Doc ID (which is 'd') p: request.p, pv: request.pv, @@ -801,7 +801,7 @@ Agent.prototype._requestPresence = function(channel, callback) { }; Agent.prototype._handlePresenceData = function(presence) { - if (presence.src === this.clientId) return; + if (presence.src === this._src()) return; if (presence.r) return this.send({a: 'pr', ch: presence.ch}); From 9e19efbfe5897fc135df8c0da20cef23e468a8c8 Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Mon, 2 Mar 2020 14:05:20 +0000 Subject: [PATCH 8/9] Add a presence feature flag This change adds a presence feature flag, so that consumers have to actively opt in to use presence. This means that servers that don't want to worry about potential fan-out issues don't need to worry about it. Note that the feature flag will disable presence subscription and also disable presence broadcasting, but notably _does not_ disable unsubscribing from presence, so that clients may unsubscribe if the flag is toggled while the server is running. --- README.md | 2 ++ examples/rich-text-presence/server.js | 2 +- lib/agent.js | 2 ++ lib/backend.js | 1 + test/client/presence/doc-presence.js | 2 +- test/client/presence/presence.js | 2 +- 6 files changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 561b69be2..93cc1c5fd 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,8 @@ __Options__ through this pub/sub adapter. Defaults to `ShareDB.MemoryPubSub()`. * `options.milestoneDb` _(instance of ShareDB.MilestoneDB`)_ Store snapshots of documents at a specified interval of versions +* `options.presence` _boolean_ + Enable presence functionality. Off by default. Note that this feature is not optimized for large numbers of clients and could cause fan-out issues #### Database Adapters * `ShareDB.MemoryDB`, backed by a non-persistent database with no queries diff --git a/examples/rich-text-presence/server.js b/examples/rich-text-presence/server.js index 4ecd37540..0bcf7f821 100644 --- a/examples/rich-text-presence/server.js +++ b/examples/rich-text-presence/server.js @@ -6,7 +6,7 @@ var WebSocket = require('ws'); var WebSocketJSONStream = require('@teamwork/websocket-json-stream'); ShareDB.types.register(richText.type); -var backend = new ShareDB(); +var backend = new ShareDB({presence: true}); createDoc(startServer); // Create initial document then fire callback diff --git a/lib/agent.js b/lib/agent.js index c28cd0641..a8ab4a3bb 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -420,6 +420,7 @@ Agent.prototype._handleMessage = function(request, callback) { case 'nt': return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback); case 'p': + if (!this.backend.presenceEnabled) return; var presence = this._createPresence(request); if (presence.t && !util.supportsPresence(types.map[presence.t])) { return callback({ @@ -429,6 +430,7 @@ Agent.prototype._handleMessage = function(request, callback) { } return this._broadcastPresence(presence, callback); case 'ps': + if (!this.backend.presenceEnabled) return; return this._subscribePresence(request.ch, request.seq, callback); case 'pu': return this._unsubscribePresence(request.ch, request.seq, callback); diff --git a/lib/backend.js b/lib/backend.js index 9c370fbba..8330546e4 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -32,6 +32,7 @@ function Backend(options) { this.suppressPublish = !!options.suppressPublish; this.maxSubmitRetries = options.maxSubmitRetries || null; + this.presenceEnabled = !!options.presence; // Map from event name to a list of middleware this.middleware = {}; diff --git a/test/client/presence/doc-presence.js b/test/client/presence/doc-presence.js index 7f611ab2f..4fb433547 100644 --- a/test/client/presence/doc-presence.js +++ b/test/client/presence/doc-presence.js @@ -18,7 +18,7 @@ describe('DocPresence', function() { var presencePauser; beforeEach(function(done) { - backend = new Backend(); + backend = new Backend({presence: true}); connection1 = backend.connect(); connection2 = backend.connect(); diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index 5d21be5a0..8d92835c3 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -14,7 +14,7 @@ describe('Presence', function() { var presencePauser; beforeEach(function(done) { - backend = new Backend(); + backend = new Backend({presence: true}); var connectedCount = 0; connection1 = backend.connect(); connection2 = backend.connect(); From 929d5150e82f19eefd7c348aa9e0d549befe1c2c Mon Sep 17 00:00:00 2001 From: Alec Gibson Date: Mon, 2 Mar 2020 15:57:28 +0000 Subject: [PATCH 9/9] Make local presence ID optional Local presence IDs should be unique. If consumers do not want the responsibility of taking care of this (and don't care about what ID is assigned to them), then we will automatically assign a random ID for them. --- README.md | 2 +- lib/client/presence/presence.js | 2 ++ test/client/presence/presence.js | 7 +++---- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 93cc1c5fd..1012c26ba 100644 --- a/README.md +++ b/README.md @@ -707,7 +707,7 @@ presence.create(presenceId): LocalPresence; Create an instance of [`LocalPresence`](#class-sharedblocalpresence), which can be used to represent local presence. Many or none such local presences may exist on a `Presence` instance. -* `presenceId` _string_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID. +* `presenceId` _string (optional)_: a unique ID representing the local presence. Remember - depending on use-case - the same client might have multiple presences, so this might not necessarily be a user or client ID. If one is not provided, a random ID will be assigned for you. #### `destroy` diff --git a/lib/client/presence/presence.js b/lib/client/presence/presence.js index f446fadad..f7c81e227 100644 --- a/lib/client/presence/presence.js +++ b/lib/client/presence/presence.js @@ -3,6 +3,7 @@ var LocalPresence = require('./local-presence'); var RemotePresence = require('./remote-presence'); var util = require('../../util'); var async = require('async'); +var hat = require('hat'); module.exports = Presence; function Presence(connection, channel) { @@ -35,6 +36,7 @@ Presence.prototype.unsubscribe = function(callback) { }; Presence.prototype.create = function(id) { + id = id || hat(); var localPresence = this._createLocalPresence(id); this.localPresences[id] = localPresence; return localPresence; diff --git a/test/client/presence/presence.js b/test/client/presence/presence.js index 8d92835c3..d95aa0a0f 100644 --- a/test/client/presence/presence.js +++ b/test/client/presence/presence.js @@ -298,10 +298,9 @@ describe('Presence', function() { }).to.throw(); }); - it('throws an error when trying to create a presence with an empty string ID', function() { - expect(function() { - presence1.create(''); - }).to.throw(); + it('assigns an ID if one is not provided', function() { + var localPresence = presence1.create(); + expect(localPresence.presenceId).to.be.ok; }); it('returns the error if a local presence cannot be destroyed because of a bad submit', function(done) {