diff options
Diffstat (limited to 'src/main/otClient.ts')
| -rw-r--r-- | src/main/otClient.ts | 46 |
1 files changed, 41 insertions, 5 deletions
diff --git a/src/main/otClient.ts b/src/main/otClient.ts index 2917bcc..9d373bd 100644 --- a/src/main/otClient.ts +++ b/src/main/otClient.ts @@ -25,10 +25,16 @@ interface OtState { version: number } +interface QueuedRemoteUpdate { + ops: OtOp[] + version: number +} + export class OtClient { private state: OtState private sendFn: SendFn private applyFn: ApplyFn + private queuedRemoteUpdates: QueuedRemoteUpdate[] = [] constructor(version: number, sendFn: SendFn, applyFn: ApplyFn) { this.state = { name: 'synchronized', inflight: null, buffer: null, version } @@ -115,6 +121,8 @@ export class OtClient { // to synchronized, the second arrives when we're already there. break } + + this.processQueuedRemoteUpdates() } /** @@ -122,15 +130,27 @@ export class OtClient { * Transform against inflight/buffered ops before applying. */ onRemoteOps(ops: OtOp[], newVersion: number) { - // Stale message detection (matching Overleaf's ShareJS): - // if the server version is behind our version, we already processed this. + // ShareJS update.v is the document version before the op is applied. + // Drop duplicates and queue out-of-order messages until their base version + // catches up, matching Overleaf's in-order processing. if (newVersion < this.state.version) { return } + if (newVersion > this.state.version) { + this.queueRemoteUpdate(ops, newVersion) + return + } + + this.applyRemoteOps(ops, newVersion) + this.processQueuedRemoteUpdates() + } + + private applyRemoteOps(ops: OtOp[], newVersion: number) { + const nextVersion = newVersion + 1 switch (this.state.name) { case 'synchronized': - this.state = { ...this.state, version: newVersion } + this.state = { ...this.state, version: nextVersion } this.applyFn(ops) break @@ -139,7 +159,7 @@ export class OtClient { this.state = { ...this.state, inflight: transformedInflight, - version: newVersion + version: nextVersion } this.applyFn(transformedRemote) break @@ -152,7 +172,7 @@ export class OtClient { ...this.state, inflight: inflightAfterRemote, buffer: bufferAfterRemote, - version: newVersion + version: nextVersion } this.applyFn(remoteAfterBuffer) break @@ -160,7 +180,23 @@ export class OtClient { } } + private queueRemoteUpdate(ops: OtOp[], version: number) { + if (this.queuedRemoteUpdates.some((update) => update.version === version)) return + this.queuedRemoteUpdates.push({ ops, version }) + this.queuedRemoteUpdates.sort((a, b) => a.version - b.version) + } + + private processQueuedRemoteUpdates() { + let nextIndex = this.queuedRemoteUpdates.findIndex((update) => update.version === this.state.version) + while (nextIndex !== -1) { + const [next] = this.queuedRemoteUpdates.splice(nextIndex, 1) + this.applyRemoteOps(next.ops, next.version) + nextIndex = this.queuedRemoteUpdates.findIndex((update) => update.version === this.state.version) + } + } + reset(version: number) { this.state = { name: 'synchronized', inflight: null, buffer: null, version } + this.queuedRemoteUpdates = [] } } |
