summaryrefslogtreecommitdiff
path: root/src/main/otClient.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/otClient.ts')
-rw-r--r--src/main/otClient.ts46
1 files changed, 41 insertions, 5 deletions
diff --git a/src/main/otClient.ts b/src/main/otClient.ts
index 2917bcc..9d373bd 100644
--- a/src/main/otClient.ts
+++ b/src/main/otClient.ts
@@ -25,10 +25,16 @@ interface OtState {
version: number
}
+interface QueuedRemoteUpdate {
+ ops: OtOp[]
+ version: number
+}
+
export class OtClient {
private state: OtState
private sendFn: SendFn
private applyFn: ApplyFn
+ private queuedRemoteUpdates: QueuedRemoteUpdate[] = []
constructor(version: number, sendFn: SendFn, applyFn: ApplyFn) {
this.state = { name: 'synchronized', inflight: null, buffer: null, version }
@@ -115,6 +121,8 @@ export class OtClient {
// to synchronized, the second arrives when we're already there.
break
}
+
+ this.processQueuedRemoteUpdates()
}
/**
@@ -122,15 +130,27 @@ export class OtClient {
* Transform against inflight/buffered ops before applying.
*/
onRemoteOps(ops: OtOp[], newVersion: number) {
- // Stale message detection (matching Overleaf's ShareJS):
- // if the server version is behind our version, we already processed this.
+ // ShareJS update.v is the document version before the op is applied.
+ // Drop duplicates and queue out-of-order messages until their base version
+ // catches up, matching Overleaf's in-order processing.
if (newVersion < this.state.version) {
return
}
+ if (newVersion > this.state.version) {
+ this.queueRemoteUpdate(ops, newVersion)
+ return
+ }
+
+ this.applyRemoteOps(ops, newVersion)
+ this.processQueuedRemoteUpdates()
+ }
+
+ private applyRemoteOps(ops: OtOp[], newVersion: number) {
+ const nextVersion = newVersion + 1
switch (this.state.name) {
case 'synchronized':
- this.state = { ...this.state, version: newVersion }
+ this.state = { ...this.state, version: nextVersion }
this.applyFn(ops)
break
@@ -139,7 +159,7 @@ export class OtClient {
this.state = {
...this.state,
inflight: transformedInflight,
- version: newVersion
+ version: nextVersion
}
this.applyFn(transformedRemote)
break
@@ -152,7 +172,7 @@ export class OtClient {
...this.state,
inflight: inflightAfterRemote,
buffer: bufferAfterRemote,
- version: newVersion
+ version: nextVersion
}
this.applyFn(remoteAfterBuffer)
break
@@ -160,7 +180,23 @@ export class OtClient {
}
}
+ private queueRemoteUpdate(ops: OtOp[], version: number) {
+ if (this.queuedRemoteUpdates.some((update) => update.version === version)) return
+ this.queuedRemoteUpdates.push({ ops, version })
+ this.queuedRemoteUpdates.sort((a, b) => a.version - b.version)
+ }
+
+ private processQueuedRemoteUpdates() {
+ let nextIndex = this.queuedRemoteUpdates.findIndex((update) => update.version === this.state.version)
+ while (nextIndex !== -1) {
+ const [next] = this.queuedRemoteUpdates.splice(nextIndex, 1)
+ this.applyRemoteOps(next.ops, next.version)
+ nextIndex = this.queuedRemoteUpdates.findIndex((update) => update.version === this.state.version)
+ }
+ }
+
reset(version: number) {
this.state = { name: 'synchronized', inflight: null, buffer: null, version }
+ this.queuedRemoteUpdates = []
}
}