From c392dc7ab2f7eccfcb1586ba5d7fa412b5301ca1 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Mon, 18 Dec 2023 19:21:29 +0200 Subject: [PATCH 1/9] Add support for debouncing updates --- src/client/provider.ts | 72 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 867c571..95b200b 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -37,6 +37,14 @@ export interface ProviderConfiguration { * (Optional) Add the authentication data */ auth?: { [key: string]: any } + /** + * The time over which to debounce document updates before syncing + */ + debounceTime?: number + /** + * Notify pending state when debouncing + */ + onPending?: (pending: boolean) => void } /** @@ -64,6 +72,29 @@ export class SocketIOProvider extends Observable { * @type {Socket} */ public socket: Socket + /** + * The time over which to debounce document updates before syncing + * @type {number} + * @private + */ + public readonly debounceTime: number + /** + * The timer used to debounce document updates + * @type {number} + * @private + */ + private updateTimer: number + /** + * Notify pending state when debouncing + * @type {((pending: boolean) => void) | undefined} + */ + public onPending: ((pending: boolean) => void) | undefined + /** + * The pending debouncing updates + * @type {number} + * @private + */ + private pendingUpdates: Uint8Array[]; /** * The yjs document * @type {Y.Doc} @@ -117,7 +148,9 @@ export class SocketIOProvider extends Observable { awareness = new AwarenessProtocol.Awareness(doc), resyncInterval = -1, disableBc = false, - auth = {} + auth = {}, + debounceTime, + onPending }: ProviderConfiguration, socketIoOptions: Partial | undefined = undefined) { super() @@ -140,6 +173,8 @@ export class SocketIOProvider extends Observable { auth: auth, ...socketIoOptions }) + this.debounceTime = debounceTime + this.onPending = onPending this.doc.on('update', this.onUpdateDoc) @@ -337,6 +372,16 @@ export class SocketIOProvider extends Observable { this.doc.off('update', this.onUpdateDoc) super.destroy() } + + private readonly onUpdateDocInner = (update: Uint8Array, origin: SocketIOProvider): void => { + this.socket.emit('sync-update', update) + if (this.bcconnected) { + bc.publish(this._broadcastChannel, { + type: 'sync-update', + data: update + }, this) + } + } /** * This function is executed when the document is updated, if the instance that @@ -347,15 +392,24 @@ export class SocketIOProvider extends Observable { * @type {(update: Uint8Array, origin: SocketIOProvider) => void} */ private readonly onUpdateDoc = (update: Uint8Array, origin: SocketIOProvider): void => { - if (origin !== this) { - this.socket.emit('sync-update', update) - if (this.bcconnected) { - bc.publish(this._broadcastChannel, { - type: 'sync-update', - data: update - }, this) - } + if (origin === this) { + return + } + if (!this.debounceTime) { + this.onUpdateDocInner(update, origin) + } + if (this.updateTimer) { + this.onPending?.(true) + this.pendingUpdates.push(update) + clearInterval(this.updateTimer) + } else { + this.pendingUpdates = [update] } + this.updateTimer = setInterval(() => { + const mergedUpdate = Y.mergeUpdates(this.pendingUpdates) + this.onUpdateDocInner(mergedUpdate, origin) + this.onPending?.(false) + }, this.debounceTime) } /** From 516fc63dff18f3644d48b59fd443aecd516847e9 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Tue, 19 Dec 2023 16:06:17 +0200 Subject: [PATCH 2/9] Lint fixes (including some meaningful ones) --- src/client/provider.ts | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 95b200b..5475982 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -77,13 +77,13 @@ export class SocketIOProvider extends Observable { * @type {number} * @private */ - public readonly debounceTime: number + public readonly debounceTime?: number /** * The timer used to debounce document updates - * @type {number} + * @type {ReturnType} * @private */ - private updateTimer: number + private updateTimer?: ReturnType /** * Notify pending state when debouncing * @type {((pending: boolean) => void) | undefined} @@ -91,10 +91,10 @@ export class SocketIOProvider extends Observable { public onPending: ((pending: boolean) => void) | undefined /** * The pending debouncing updates - * @type {number} + * @type {Uint8Array[]} * @private */ - private pendingUpdates: Uint8Array[]; + private pendingUpdates: Uint8Array[] /** * The yjs document * @type {Y.Doc} @@ -132,7 +132,7 @@ export class SocketIOProvider extends Observable { * @type {Partial | undefined} * @private */ - private readonly _socketIoOptions: Partial | undefined; + private readonly _socketIoOptions: Partial | undefined /** * SocketIOProvider constructor @@ -151,8 +151,8 @@ export class SocketIOProvider extends Observable { auth = {}, debounceTime, onPending - }: ProviderConfiguration, - socketIoOptions: Partial | undefined = undefined) { + }: ProviderConfiguration, + socketIoOptions: Partial | undefined = undefined) { super() while (url[url.length - 1] === '/') { url = url.slice(0, url.length - 1) @@ -164,7 +164,7 @@ export class SocketIOProvider extends Observable { this._broadcastChannel = `${url}/${roomName}` this.disableBc = disableBc - this._socketIoOptions = socketIoOptions; + this._socketIoOptions = socketIoOptions this.socket = io(`${this.url}/yjs|${roomName}`, { autoConnect: false, @@ -175,6 +175,7 @@ export class SocketIOProvider extends Observable { }) this.debounceTime = debounceTime this.onPending = onPending + this.pendingUpdates = [] this.doc.on('update', this.onUpdateDoc) @@ -368,11 +369,11 @@ export class SocketIOProvider extends Observable { if (typeof window !== 'undefined') window.removeEventListener('beforeunload', this.beforeUnloadHandler) else if (typeof process !== 'undefined') process.off('exit', this.beforeUnloadHandler) this.awareness.off('update', this.awarenessUpdate) - this.awareness.destroy(); + this.awareness.destroy() this.doc.off('update', this.onUpdateDoc) super.destroy() } - + private readonly onUpdateDocInner = (update: Uint8Array, origin: SocketIOProvider): void => { this.socket.emit('sync-update', update) if (this.bcconnected) { @@ -395,20 +396,21 @@ export class SocketIOProvider extends Observable { if (origin === this) { return } - if (!this.debounceTime) { + if (this.debounceTime !== undefined) { this.onUpdateDocInner(update, origin) } - if (this.updateTimer) { + if (this.updateTimer !== undefined) { this.onPending?.(true) this.pendingUpdates.push(update) - clearInterval(this.updateTimer) + clearTimeout(this.updateTimer) } else { this.pendingUpdates = [update] } - this.updateTimer = setInterval(() => { + this.updateTimer = setTimeout(() => { const mergedUpdate = Y.mergeUpdates(this.pendingUpdates) this.onUpdateDocInner(mergedUpdate, origin) this.onPending?.(false) + this.updateTimer = undefined }, this.debounceTime) } From e6cab08378e1c5ba7a095c52b086f91bfac2897e Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Tue, 19 Dec 2023 16:48:05 +0200 Subject: [PATCH 3/9] Fix mistake in last commit --- src/client/provider.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 5475982..67e42cc 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -396,7 +396,7 @@ export class SocketIOProvider extends Observable { if (origin === this) { return } - if (this.debounceTime !== undefined) { + if (this.debounceTime === undefined) { this.onUpdateDocInner(update, origin) } if (this.updateTimer !== undefined) { From 71df4b0f94760666290270c33eb3cd7c0dc608ab Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Tue, 19 Dec 2023 18:22:50 +0200 Subject: [PATCH 4/9] Add ability to debounce awareness updates --- src/client/provider.ts | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 67e42cc..77aae7a 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -41,6 +41,10 @@ export interface ProviderConfiguration { * The time over which to debounce document updates before syncing */ debounceTime?: number + /** + * The time over which to debounce document awareness updates before syncing + */ + debounceAwarenessTime?: number /** * Notify pending state when debouncing */ @@ -78,6 +82,12 @@ export class SocketIOProvider extends Observable { * @private */ public readonly debounceTime?: number + /** + * The time over which to debounce document awareness updates before syncing + * @type {number} + * @private + */ + public readonly debounceAwarenessTime?: number /** * The timer used to debounce document updates * @type {ReturnType} @@ -150,6 +160,7 @@ export class SocketIOProvider extends Observable { disableBc = false, auth = {}, debounceTime, + debounceAwarenessTime, onPending }: ProviderConfiguration, socketIoOptions: Partial | undefined = undefined) { @@ -174,6 +185,7 @@ export class SocketIOProvider extends Observable { ...socketIoOptions }) this.debounceTime = debounceTime + this.debounceAwarenessTime = debounceAwarenessTime this.onPending = onPending this.pendingUpdates = [] @@ -385,8 +397,6 @@ export class SocketIOProvider extends Observable { } /** - * This function is executed when the document is updated, if the instance that - * emit the change is not this, it emit the changes by socket and broadcast channel. * @private * @param {Uint8Array} update Document update * @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change. @@ -425,13 +435,12 @@ export class SocketIOProvider extends Observable { } /** - * This function is executed when the local awareness changes and this broadcasts the changes per socket and broadcast channel. * @private * @param {{ added: number[], updated: number[], removed: number[] }} awarenessChanges The clients added, updated and removed * @param {SocketIOProvider | null} origin The SocketIOProvider instance that emits the change. * @type {({ added, updated, removed }: { added: number[], updated: number[], removed: number[] }, origin: SocketIOProvider | null) => void} */ - private readonly awarenessUpdate = ({ added, updated, removed }: AwarenessChange, origin: SocketIOProvider | null): void => { + private readonly awarenessUpdateInner = ({ added, updated, removed }: AwarenessChange, origin: SocketIOProvider | null): void => { const changedClients = added.concat(updated).concat(removed) this.socket.emit('awareness-update', AwarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients)) if (this.bcconnected) { @@ -442,6 +451,26 @@ export class SocketIOProvider extends Observable { } } + /** + * This function is executed when the local awareness changes and this broadcasts the changes per socket and broadcast channel. + * @private + * @param {{ added: number[], updated: number[], removed: number[] }} awarenessChanges The clients added, updated and removed + * @param {SocketIOProvider | null} origin The SocketIOProvider instance that emits the change. + * @type {({ added, updated, removed }: { added: number[], updated: number[], removed: number[] }, origin: SocketIOProvider | null) => void} + */ + private readonly awarenessUpdate = (awarenessChange: AwarenessChange, origin: SocketIOProvider | null): void => { + if (this.debounceAwarenessTime === undefined) { + this.awarenessUpdateInner(awarenessChange, origin) + } + if (this.updateTimer !== undefined) { + clearTimeout(this.updateTimer) + } + this.updateTimer = setTimeout(() => { + this.awarenessUpdateInner(awarenessChange, origin) + this.updateTimer = undefined + }, this.debounceAwarenessTime) + } + /** * This function is executed when the windows will be unloaded or the process will be closed and this * will remove the local client from awareness. From d64121ddada2a276e4e80612d5e7fe1382bfb515 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Tue, 19 Dec 2023 19:25:25 +0200 Subject: [PATCH 5/9] Fix mistake in binding awareness and doc update to the same timer :/ --- src/client/provider.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 77aae7a..a7856b9 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -94,6 +94,12 @@ export class SocketIOProvider extends Observable { * @private */ private updateTimer?: ReturnType + /** + * The timer used to debounce document awareness updates + * @type {ReturnType} + * @private + */ + private updateAwarenessTimer?: ReturnType /** * Notify pending state when debouncing * @type {((pending: boolean) => void) | undefined} @@ -462,12 +468,12 @@ export class SocketIOProvider extends Observable { if (this.debounceAwarenessTime === undefined) { this.awarenessUpdateInner(awarenessChange, origin) } - if (this.updateTimer !== undefined) { - clearTimeout(this.updateTimer) + if (this.updateAwarenessTimer !== undefined) { + clearTimeout(this.updateAwarenessTimer) } - this.updateTimer = setTimeout(() => { + this.updateAwarenessTimer = setTimeout(() => { this.awarenessUpdateInner(awarenessChange, origin) - this.updateTimer = undefined + this.updateAwarenessTimer = undefined }, this.debounceAwarenessTime) } From cf9f8f11490adbccad73e9b3f35614508fb8de81 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Wed, 20 Dec 2023 11:49:44 +0200 Subject: [PATCH 6/9] Copy maxWait approach from Lodash debounce Make debounced awareness run at least once per maxWaitForDebouncingAwareness Make debounced updates run at least once per maxWaitForDebouncingUpdates --- src/client/provider.ts | 57 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/src/client/provider.ts b/src/client/provider.ts index a7856b9..52a7c01 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -41,10 +41,18 @@ export interface ProviderConfiguration { * The time over which to debounce document updates before syncing */ debounceTime?: number + /** + * The maximum time to wait before debouncing updates + */ + maxWaitForDebouncingUpdates?: number /** * The time over which to debounce document awareness updates before syncing */ debounceAwarenessTime?: number + /** + * The maximum time to wait before debouncing awareness updates + */ + maxWaitForDebouncingAwareness?: number /** * Notify pending state when debouncing */ @@ -82,6 +90,18 @@ export class SocketIOProvider extends Observable { * @private */ public readonly debounceTime?: number + /** + * The maximum time to wait before debouncing updates + * @type {number} + * @private + */ + public readonly maxWaitForDebouncingUpdates?: number + /** + * The maximum time to wait before debouncing awareness updates + * @type {number} + * @private + */ + public readonly maxWaitForDebouncingAwareness?: number /** * The time over which to debounce document awareness updates before syncing * @type {number} @@ -94,6 +114,20 @@ export class SocketIOProvider extends Observable { * @private */ private updateTimer?: ReturnType + /** + * The unix timestamp of the last update, + * used to ensure updates are sent within maxWaitForDebouncingUpdates + * @type {number} + * @private + */ + private lastUpdate?: number + /** + * The unix timestamp of the last awareness update, + * used to ensure updates are sent within maxWaitForDebouncingAwareness + * @type {number} + * @private + */ + private lastAwarenessUpdate?: number /** * The timer used to debounce document awareness updates * @type {ReturnType} @@ -166,7 +200,9 @@ export class SocketIOProvider extends Observable { disableBc = false, auth = {}, debounceTime, + maxWaitForDebouncingUpdates, debounceAwarenessTime, + maxWaitForDebouncingAwareness, onPending }: ProviderConfiguration, socketIoOptions: Partial | undefined = undefined) { @@ -191,7 +227,9 @@ export class SocketIOProvider extends Observable { ...socketIoOptions }) this.debounceTime = debounceTime + this.maxWaitForDebouncingUpdates = maxWaitForDebouncingUpdates ?? debounceTime this.debounceAwarenessTime = debounceAwarenessTime + this.maxWaitForDebouncingAwareness = maxWaitForDebouncingAwareness ?? debounceAwarenessTime this.onPending = onPending this.pendingUpdates = [] @@ -415,6 +453,16 @@ export class SocketIOProvider extends Observable { if (this.debounceTime === undefined) { this.onUpdateDocInner(update, origin) } + if (this.maxWaitForDebouncingUpdates !== undefined && this.lastUpdate !== undefined && Date.now() - this.lastUpdate >= this.maxWaitForDebouncingUpdates) { + // Ensure updates are sent at least once every maxWaitForDebouncingUpdates + this.pendingUpdates.push(update) + clearTimeout(this.updateTimer) + this.updateTimer = undefined + this.lastUpdate = Date.now() + const mergedUpdate = Y.mergeUpdates(this.pendingUpdates) + this.onUpdateDocInner(mergedUpdate, origin) + this.onPending?.(false) + } if (this.updateTimer !== undefined) { this.onPending?.(true) this.pendingUpdates.push(update) @@ -423,6 +471,7 @@ export class SocketIOProvider extends Observable { this.pendingUpdates = [update] } this.updateTimer = setTimeout(() => { + this.lastUpdate = Date.now() const mergedUpdate = Y.mergeUpdates(this.pendingUpdates) this.onUpdateDocInner(mergedUpdate, origin) this.onPending?.(false) @@ -471,7 +520,15 @@ export class SocketIOProvider extends Observable { if (this.updateAwarenessTimer !== undefined) { clearTimeout(this.updateAwarenessTimer) } + if (this.maxWaitForDebouncingAwareness !== undefined && this.lastAwarenessUpdate !== undefined && Date.now() - this.lastAwarenessUpdate >= this.maxWaitForDebouncingAwareness) { + // Ensure waiting no longer than `debounceAwarenessTime` for an awareness update + clearTimeout(this.updateAwarenessTimer) + this.updateAwarenessTimer = undefined + this.lastAwarenessUpdate = Date.now() + this.awarenessUpdateInner(awarenessChange, origin) + } this.updateAwarenessTimer = setTimeout(() => { + this.lastAwarenessUpdate = Date.now() this.awarenessUpdateInner(awarenessChange, origin) this.updateAwarenessTimer = undefined }, this.debounceAwarenessTime) From 147faf0c0ea0fd2bbf2907a4b7e8df7b5a27f908 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Thu, 28 Dec 2023 16:09:23 +0200 Subject: [PATCH 7/9] Fix bug in sync acknowledgement via socket.io lost --- src/client/provider.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client/provider.ts b/src/client/provider.ts index 52a7c01..7b13505 100644 --- a/src/client/provider.ts +++ b/src/client/provider.ts @@ -233,6 +233,12 @@ export class SocketIOProvider extends Observable { this.onPending = onPending this.pendingUpdates = [] + this.initSyncListeners() + + this.initAwarenessListeners() + + this.initSystemListeners() + this.doc.on('update', this.onUpdateDoc) this.socket.on('connect', () => this.onSocketConnection(resyncInterval)) @@ -241,12 +247,6 @@ export class SocketIOProvider extends Observable { this.socket.on('connect_error', (error) => this.onSocketConnectionError(error)) - this.initSyncListeners() - - this.initAwarenessListeners() - - this.initSystemListeners() - awareness.on('update', this.awarenessUpdate) if (autoConnect) this.connect() From af9e22ff353a33b724d5bb265851a81a77e84ea1 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Thu, 28 Dec 2023 16:13:22 +0200 Subject: [PATCH 8/9] Add `localOnly` and `timeoutForAck` arguments These flags are used on the server-side. `localOnly` uses the `.local` feature on socket.io (which useful when synchronizing via another channel, e.g. Redis pub / sub, between servers, which is necessary anyway to ensure the server state is synchronized properly) `timeoutForAck` determines the timeout for the "sync-step-1" event Added error handling to failed acknowledgement for "sync-step-1" --- src/server/document.ts | 18 +- src/server/y-socket-io.ts | 545 ++++++++++++++++++++------------------ 2 files changed, 296 insertions(+), 267 deletions(-) diff --git a/src/server/document.ts b/src/server/document.ts index de266f2..7da6288 100644 --- a/src/server/document.ts +++ b/src/server/document.ts @@ -36,11 +36,17 @@ export class Document extends Y.Doc { */ public name: string /** - * The socket connection + * The namespace connection * @type {Namespace} * @private */ private readonly namespace: Namespace + /** + * Indicator as to whether to send document updates only to local WebSockets + * @type {boolean} + * @private + */ + private readonly localOnly?: boolean /** * The document awareness * @type {Awareness} @@ -58,12 +64,14 @@ export class Document extends Y.Doc { * @constructor * @param {string} name Name for the document * @param {Namespace} namespace The namespace connection + * @param {boolean} localOnly Indicator as to whether to send document updates only to local WebSockets * @param {Callbacks} callbacks The document callbacks */ - constructor (name: string, namespace: Namespace, callbacks?: Callbacks) { + constructor (name: string, namespace: Namespace, localOnly?: boolean, callbacks?: Callbacks) { super({ gc: gcEnabled }) this.name = name this.namespace = namespace + this.localOnly = localOnly this.awareness = new AwarenessProtocol.Awareness(this) this.awareness.setLocalState(null) this.callbacks = callbacks @@ -87,7 +95,11 @@ export class Document extends Y.Doc { console.warn(error) } } - this.namespace.emit('sync-update', update) + if (this.localOnly !== undefined && this.localOnly) { + this.namespace.local.emit('sync-update', update) + } else { + this.namespace.emit('sync-update', update) + } } /** diff --git a/src/server/y-socket-io.ts b/src/server/y-socket-io.ts index 1ad4524..5441588 100644 --- a/src/server/y-socket-io.ts +++ b/src/server/y-socket-io.ts @@ -1,264 +1,281 @@ -import * as Y from 'yjs' -import { Namespace, Server, Socket } from 'socket.io' -import * as AwarenessProtocol from 'y-protocols/awareness' -import { LeveldbPersistence } from 'y-leveldb' -import { Document } from './document' -import { Observable } from 'lib0/observable' - -/** - * Level db persistence object - */ -export interface Persistence { - bindState: (docName: string, ydoc: Document) => void - writeState: (docName: string, ydoc: Document) => Promise - provider: any -} - -/** - * YSocketIO instance cofiguration. Here you can configure: - * - gcEnabled: Enable/Disable garbage collection (default: gc=true) - * - levelPersistenceDir: The directory path where the persistent Level database will be stored - * - authenticate: The callback to authenticate the client connection - */ -export interface YSocketIOConfiguration { - /** - * Enable/Disable garbage collection (default: gc=true) - */ - gcEnabled?: boolean - /** - * The directory path where the persistent Level database will be stored - */ - levelPersistenceDir?: string - /** - * Callback to authenticate the client connection. - * - * It can be a promise and if it returns true, the connection is allowed; otherwise, if it returns false, the connection is rejected. - * @param handshake Provided from the handshake attribute of the socket io - */ - authenticate?: (handshake: { [key: string]: any }) => Promise | boolean -} - -/** - * YSocketIO class. This handles document synchronization. - */ -export class YSocketIO extends Observable { - /** - * @type {Map} - */ - private readonly _documents: Map = new Map() - /** - * @type {Server} - */ - private readonly io: Server - /** - * @type {string | undefined | null} - */ - private readonly _levelPersistenceDir: string | undefined | null = null - /** - * @type {Persistence | null} - */ - private persistence: Persistence | null = null - /** - * @type {YSocketIOConfiguration} - */ - private readonly configuration?: YSocketIOConfiguration - /** - * @type {Namespace | null} - */ - public nsp: Namespace | null = null - /** - * YSocketIO constructor. - * @constructor - * @param {Server} io Server instance from Socket IO - * @param {YSocketIOConfiguration} configuration (Optional) The YSocketIO configuration - */ - constructor (io: Server, configuration?: YSocketIOConfiguration) { - super() - - this.io = io - - this._levelPersistenceDir = configuration?.levelPersistenceDir ?? process.env.YPERSISTENCE - if (this._levelPersistenceDir != null) this.initLevelDB(this._levelPersistenceDir) - - this.configuration = configuration - } - - /** - * YSocketIO initialization. - * - * This method set ups a dynamic namespace manager for namespaces that match with the regular expression `/^\/yjs\|.*$/` - * and adds the connection authentication middleware to the dynamics namespaces. - * - * It also starts socket connection listeners. - * @type {() => void} - */ - public initialize (): void { - this.nsp = this.io.of(/^\/yjs\|.*$/) - - this.nsp.use(async (socket, next) => { - if ((this.configuration?.authenticate) == null) return next() - if (await this.configuration.authenticate(socket.handshake)) return next() - else return next(new Error('Unauthorized')) - }) - - this.nsp.on('connection', async (socket) => { - const namespace = socket.nsp.name.replace(/\/yjs\|/, '') - - const doc = await this.initDocument(namespace, socket.nsp, this.configuration?.gcEnabled) - - this.initSyncListeners(socket, doc) - this.initAwarenessListeners(socket, doc) - - this.initSocketListeners(socket, doc) - - this.startSynchronization(socket, doc) - }) - } - - /** - * The document map's getter. If you want to delete a document externally, make sure you don't delete - * the document directly from the map, instead use the "destroy" method of the document you want to delete, - * this way when you destroy the document you are also closing any existing connection on the document. - * @type {Map} - */ - public get documents (): Map { - return this._documents - } - - /** - * This method creates a yjs document if it doesn't exist in the document map. If the document exists, get the map document. - * - * - If document is created: - * - Binds the document to LevelDB if LevelDB persistence is enabled. - * - Adds the new document to the documents map. - * - Emit the `document-loaded` event - * @private - * @param {string} name The name for the document - * @param {Namespace} namespace The namespace of the document - * @param {boolean} gc Enable/Disable garbage collection (default: gc=true) - * @returns {Promise} The document - */ - private async initDocument (name: string, namespace: Namespace, gc: boolean = true): Promise { - const doc = this._documents.get(name) ?? (new Document(name, namespace, { - onUpdate: (doc, update) => this.emit('document-update', [doc, update]), - onChangeAwareness: (doc, update) => this.emit('awareness-update', [doc, update]), - onDestroy: async (doc) => { - this._documents.delete(doc.name) - this.emit('document-destroy', [doc]) - } - })) - doc.gc = gc - if (!this._documents.has(name)) { - if (this.persistence != null) await this.persistence.bindState(name, doc) - this._documents.set(name, doc) - this.emit('document-loaded', [doc]) - } - return doc - } - - /** - * This method sets persistence if enabled. - * @private - * @param {string} levelPersistenceDir The directory path where the persistent Level database is stored - */ - private initLevelDB (levelPersistenceDir: string): void { - const ldb = new LeveldbPersistence(levelPersistenceDir) - this.persistence = { - provider: ldb, - bindState: async (docName: string, ydoc: Document) => { - const persistedYdoc = await ldb.getYDoc(docName) - const newUpdates = Y.encodeStateAsUpdate(ydoc) - await ldb.storeUpdate(docName, newUpdates) - Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc)) - ydoc.on('update', async (update: Uint8Array) => await ldb.storeUpdate(docName, update)) - }, - writeState: async (_docName: string, _ydoc: Document) => { } - } - } - - /** - * This function initializes the socket event listeners to synchronize document changes. - * - * The synchronization protocol is as follows: - * - A client emits the sync step one event (`sync-step-1`) which sends the document as a state vector - * and the sync step two callback as an acknowledgment according to the socket io acknowledgments. - * - When the server receives the `sync-step-1` event, it executes the `syncStep2` acknowledgment callback and sends - * the difference between the received state vector and the local document (this difference is called an update). - * - The second step of the sync is to apply the update sent in the `syncStep2` callback parameters from the server - * to the document on the client side. - * - There is another event (`sync-update`) that is emitted from the client, which sends an update for the document, - * and when the server receives this event, it applies the received update to the local document. - * - When an update is applied to a document, it will fire the document's "update" event, which - * sends the update to clients connected to the document's namespace. - * @private - * @type {(socket: Socket, doc: Document) => void} - * @param {Socket} socket The socket connection - * @param {Document} doc The document - */ - private readonly initSyncListeners = (socket: Socket, doc: Document): void => { - socket.on('sync-step-1', (stateVector: Uint8Array, syncStep2: (update: Uint8Array) => void) => { - syncStep2(Y.encodeStateAsUpdate(doc, new Uint8Array(stateVector))) - }) - - socket.on('sync-update', (update: Uint8Array) => { - Y.applyUpdate(doc, update, null) - }) - } - - /** - * This function initializes socket event listeners to synchronize awareness changes. - * - * The awareness protocol is as follows: - * - A client emits the `awareness-update` event by sending the awareness update. - * - The server receives that event and applies the received update to the local awareness. - * - When an update is applied to awareness, the awareness "update" event will fire, which - * sends the update to clients connected to the document namespace. - * @private - * @type {(socket: Socket, doc: Document) => void} - * @param {Socket} socket The socket connection - * @param {Document} doc The document - */ - private readonly initAwarenessListeners = (socket: Socket, doc: Document): void => { - socket.on('awareness-update', (update: ArrayBuffer) => { - AwarenessProtocol.applyAwarenessUpdate(doc.awareness, new Uint8Array(update), socket) - }) - } - - /** - * This function initializes socket event listeners for general purposes. - * - * When a client has been disconnected, check the clients connected to the document namespace, - * if no connection remains, emit the `all-document-connections-closed` event - * parameters and if LevelDB persistence is enabled, persist the document in LevelDB and destroys it. - * @private - * @type {(socket: Socket, doc: Document) => void} - * @param {Socket} socket The socket connection - * @param {Document} doc The document - */ - private readonly initSocketListeners = (socket: Socket, doc: Document): void => { - socket.on('disconnect', async () => { - if ((await socket.nsp.allSockets()).size === 0) { - this.emit('all-document-connections-closed', [doc]) - if (this.persistence != null) { - await this.persistence.writeState(doc.name, doc) - await doc.destroy() - } - } - }) - } - - /** - * This function is called when a client connects and it emit the `sync-step-1` and `awareness-update` - * events to the client to start the sync. - * @private - * @type {(socket: Socket, doc: Document) => void} - * @param {Socket} socket The socket connection - * @param {Document} doc The document - */ - private readonly startSynchronization = (socket: Socket, doc: Document): void => { - socket.emit('sync-step-1', Y.encodeStateVector(doc), (update: Uint8Array) => { - Y.applyUpdate(doc, new Uint8Array(update), this) - }) - socket.emit('awareness-update', AwarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(doc.awareness.getStates().keys()))) - } -} +import * as Y from 'yjs' +import { Namespace, Server, Socket } from 'socket.io' +import * as AwarenessProtocol from 'y-protocols/awareness' +import { LeveldbPersistence } from 'y-leveldb' +import { Document } from './document' +import { Observable } from 'lib0/observable' +import { Handshake } from 'socket.io/dist/socket' + +const DEFAULT_TIMEOUT_FOR_ACK = 10_000 + +/** + * Level db persistence object + */ +export interface Persistence { + bindState: (docName: string, ydoc: Document) => void + writeState: (docName: string, ydoc: Document) => Promise + provider: any +} + +/** + * YSocketIO instance cofiguration. Here you can configure: + * - gcEnabled: Enable/Disable garbage collection (default: gc=true) + * - levelPersistenceDir: The directory path where the persistent Level database will be stored + * - authenticate: The callback to authenticate the client connection + */ +export interface YSocketIOConfiguration { + /** + * Enable/Disable garbage collection (default: gc=true) + */ + gcEnabled?: boolean + /** + * Enable/Disable sending messages only to local WebSockets + */ + localOnly?: boolean + /** + * Set timeout time for acknowledgements + */ + timeoutForAck?: number + /** + * The directory path where the persistent Level database will be stored + */ + levelPersistenceDir?: string + /** + * Callback to authenticate the client connection. + * + * It can be a promise and if it returns true, the connection is allowed; otherwise, if it returns false, the connection is rejected. + * @param handshake Provided from the handshake attribute of the socket io + */ + authenticate?: (handshake: { [key: string]: any }) => Promise | boolean +} + +/** + * YSocketIO class. This handles document synchronization. + */ +export class YSocketIO extends Observable { + /** + * @type {Map} + */ + private readonly _documents: Map = new Map() + /** + * @type {Server} + */ + private readonly io: Server + /** + * @type {string | undefined | null} + */ + private readonly _levelPersistenceDir: string | undefined | null = null + /** + * @type {Persistence | null} + */ + private persistence: Persistence | null = null + /** + * @type {YSocketIOConfiguration} + */ + private readonly configuration?: YSocketIOConfiguration + /** + * @type {Namespace | null} + */ + public nsp: Namespace | null = null + /** + * YSocketIO constructor. + * @constructor + * @param {Server} io Server instance from Socket IO + * @param {YSocketIOConfiguration} configuration (Optional) The YSocketIO configuration + */ + constructor (io: Server, configuration?: YSocketIOConfiguration) { + super() + + this.io = io + + this._levelPersistenceDir = configuration?.levelPersistenceDir ?? process.env.YPERSISTENCE + if (this._levelPersistenceDir != null) this.initLevelDB(this._levelPersistenceDir) + + this.configuration = configuration + } + + /** + * YSocketIO initialization. + * + * This method set ups a dynamic namespace manager for namespaces that match with the regular expression `/^\/yjs\|.*$/` + * and adds the connection authentication middleware to the dynamics namespaces. + * + * It also starts socket connection listeners. + * @type {() => void} + */ + public initialize (): void { + this.nsp = this.io.of(/^\/yjs\|.*$/) + + this.nsp.use(async (socket, next) => { + if ((this.configuration?.authenticate) == null) return next() + if (await this.configuration.authenticate(socket.handshake)) return next() + else return next(new Error('Unauthorized')) + }) + + this.nsp.on('connection', async (socket) => { + const namespace = socket.nsp.name.replace(/\/yjs\|/, '') + + const doc = await this.initDocument(namespace, socket.nsp, this.configuration?.localOnly, this.configuration?.gcEnabled) + + this.initSyncListeners(socket, doc) + this.initAwarenessListeners(socket, doc) + + this.initSocketListeners(socket, doc) + + this.startSynchronization(socket, doc) + }) + } + + /** + * The document map's getter. If you want to delete a document externally, make sure you don't delete + * the document directly from the map, instead use the "destroy" method of the document you want to delete, + * this way when you destroy the document you are also closing any existing connection on the document. + * @type {Map} + */ + public get documents (): Map { + return this._documents + } + + /** + * This method creates a yjs document if it doesn't exist in the document map. If the document exists, get the map document. + * + * - If document is created: + * - Binds the document to LevelDB if LevelDB persistence is enabled. + * - Adds the new document to the documents map. + * - Emit the `document-loaded` event + * @private + * @param {string} name The name for the document + * @param {Namespace} namespace The namespace of the document + * @param {boolean} gc Enable/Disable garbage collection (default: gc=true) + * @returns {Promise} The document + */ + private async initDocument (name: string, namespace: Namespace, localOnly?: boolean, gc: boolean = true): Promise { + const doc = this._documents.get(name) ?? (new Document(name, namespace, localOnly, { + onUpdate: (doc, update) => this.emit('document-update', [doc, update]), + onChangeAwareness: (doc, update) => this.emit('awareness-update', [doc, update]), + onDestroy: async (doc) => { + this._documents.delete(doc.name) + this.emit('document-destroy', [doc]) + } + })) + doc.gc = gc + if (!this._documents.has(name)) { + if (this.persistence != null) await this.persistence.bindState(name, doc) + this._documents.set(name, doc) + this.emit('document-loaded', [doc]) + } + return doc + } + + /** + * This method sets persistence if enabled. + * @private + * @param {string} levelPersistenceDir The directory path where the persistent Level database is stored + */ + private initLevelDB (levelPersistenceDir: string): void { + const ldb = new LeveldbPersistence(levelPersistenceDir) + this.persistence = { + provider: ldb, + bindState: async (docName: string, ydoc: Document) => { + const persistedYdoc = await ldb.getYDoc(docName) + const newUpdates = Y.encodeStateAsUpdate(ydoc) + await ldb.storeUpdate(docName, newUpdates) + Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc)) + ydoc.on('update', async (update: Uint8Array) => await ldb.storeUpdate(docName, update)) + }, + writeState: async (_docName: string, _ydoc: Document) => { } + } + } + + /** + * This function initializes the socket event listeners to synchronize document changes. + * + * The synchronization protocol is as follows: + * - A client emits the sync step one event (`sync-step-1`) which sends the document as a state vector + * and the sync step two callback as an acknowledgment according to the socket io acknowledgments. + * - When the server receives the `sync-step-1` event, it executes the `syncStep2` acknowledgment callback and sends + * the difference between the received state vector and the local document (this difference is called an update). + * - The second step of the sync is to apply the update sent in the `syncStep2` callback parameters from the server + * to the document on the client side. + * - There is another event (`sync-update`) that is emitted from the client, which sends an update for the document, + * and when the server receives this event, it applies the received update to the local document. + * - When an update is applied to a document, it will fire the document's "update" event, which + * sends the update to clients connected to the document's namespace. + * @private + * @type {(socket: Socket, doc: Document) => void} + * @param {Socket} socket The socket connection + * @param {Document} doc The document + */ + private readonly initSyncListeners = (socket: Socket, doc: Document): void => { + socket.on('sync-step-1', (stateVector: Uint8Array, syncStep2: (update: Uint8Array) => void) => { + syncStep2(Y.encodeStateAsUpdate(doc, new Uint8Array(stateVector))) + }) + + socket.on('sync-update', (update: Uint8Array) => { + Y.applyUpdate(doc, update, null) + }) + } + + /** + * This function initializes socket event listeners to synchronize awareness changes. + * + * The awareness protocol is as follows: + * - A client emits the `awareness-update` event by sending the awareness update. + * - The server receives that event and applies the received update to the local awareness. + * - When an update is applied to awareness, the awareness "update" event will fire, which + * sends the update to clients connected to the document namespace. + * @private + * @type {(socket: Socket, doc: Document) => void} + * @param {Socket} socket The socket connection + * @param {Document} doc The document + */ + private readonly initAwarenessListeners = (socket: Socket, doc: Document): void => { + socket.on('awareness-update', (update: ArrayBuffer) => { + AwarenessProtocol.applyAwarenessUpdate(doc.awareness, new Uint8Array(update), socket) + }) + } + + /** + * This function initializes socket event listeners for general purposes. + * + * When a client has been disconnected, check the clients connected to the document namespace, + * if no connection remains, emit the `all-document-connections-closed` event + * parameters and if LevelDB persistence is enabled, persist the document in LevelDB and destroys it. + * @private + * @type {(socket: Socket, doc: Document) => void} + * @param {Socket} socket The socket connection + * @param {Document} doc The document + */ + private readonly initSocketListeners = (socket: Socket, doc: Document): void => { + socket.on('disconnect', async () => { + if ((await socket.nsp.allSockets()).size === 0) { + this.emit('all-document-connections-closed', [doc]) + if (this.persistence != null) { + await this.persistence.writeState(doc.name, doc) + await doc.destroy() + } + } + }) + } + + /** + * This function is called when a client connects and it emit the `sync-step-1` and `awareness-update` + * events to the client to start the sync. + * @private + * @type {(socket: Socket, doc: Document) => void} + * @param {Socket} socket The socket connection + * @param {Document} doc The document + */ + private readonly startSynchronization = (socket: Socket, doc: Document): void => { + socket.timeout( + this.configuration?.timeoutForAck ?? DEFAULT_TIMEOUT_FOR_ACK + ).emit('sync-step-1', Y.encodeStateVector(doc), (error: Error | undefined | null, update: Uint8Array) => { + if (error !== undefined && error !== null) { + console.error('An error occurred during sync-step-1', error) + return + } + Y.applyUpdate(doc, new Uint8Array(update), this) + }) + socket.emit('awareness-update', AwarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(doc.awareness.getStates().keys()))) + } +} From 85f00bba27414f3383dabd0109e326edfc6dfe68 Mon Sep 17 00:00:00 2001 From: Ben Lewis Date: Thu, 28 Dec 2023 16:13:41 +0200 Subject: [PATCH 9/9] Fix handshake type in authenticate argument --- src/server/y-socket-io.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/y-socket-io.ts b/src/server/y-socket-io.ts index 5441588..2dae6d1 100644 --- a/src/server/y-socket-io.ts +++ b/src/server/y-socket-io.ts @@ -46,7 +46,7 @@ export interface YSocketIOConfiguration { * It can be a promise and if it returns true, the connection is allowed; otherwise, if it returns false, the connection is rejected. * @param handshake Provided from the handshake attribute of the socket io */ - authenticate?: (handshake: { [key: string]: any }) => Promise | boolean + authenticate?: (handshake: Handshake) => Promise | boolean } /**