本文作者:IMWeb dekuchen 原文出处:IMWeb社区 未经同意,禁止转载
operation transform了解一下?
dekuchen 2018.6
1984 年,MIT 的科学家提出了计算机支持的协同工作(Computer Supported Cooperative Work,缩写为 CSCW),使得人们可以借助计算机和互联网去完成协同工作。比如利用用于文档编辑的 Wikis 和用于编程的版本控制系统,小组成员可以在世界任意角落去共同编写大型的百科全书或软件系统
协同文档现在貌似有很多公司陆续实现了,例如最早的 Google、国内的石墨文档、腾讯的腾讯文档等等。
虽然在使用中看似很简单,但是实际上这个协同文档的技术实现有很多需要注意的地方。对于公司来说,由于员工较多,而且一般 leader 具有较高工程能力,对他们来说不是什么很困难的事情。但是即使这样,Google 办公套件至少用了两年时间才使他们的协同系统成熟。
这里我简单的跟大家分享一下,协同文档的技术实现的其中一个方式,也是最具有普遍意义的方式,可以协同任何数据结构。
这个算法被称为 OT 算法。这个算法本身并不复杂,但是协同文档本身涉及更复杂的系统设计,因为它本身就是分布式的,至少客户端和服务端是分布式的。在较高性能的要求下,服务端可能也是分布式的。所以,如何使这些都能很好的协同,是很值得考虑的。
实时协同编辑,通俗来讲,是指多人同时在线编辑一个文档,且当一个参与者编辑文档的某处时,这个修改会立即同步到其他参与者的计算机上。归纳起来,需要下面几个步骤:
由于没有锁的机制,当多个参与者在编辑同一处内容时,便可能出现冲突,这个时候就需要通过一定的算法来自动地解决这些冲突。最后,当所有改变都同步后,每个参与者计算机上所看到的文档内容应该是完全一致的。
It turns out that implementing this kind of real-time collaboration is far from trivial. The most common solution responds to the name Operational Transformation (usually abbreviated OT). It originated from a research paper published in 1989 but got more recently popularized by Google Wave. Today, it powers many collaborative editors such as
These are only a few examples from a growing number of applications with realtime collaboration. I expect this number to rise even faster now that Google has published the Drive Realtime API, which is based on OT and let’s third-party apps use the same collaboration as Google Docs.
http://operational-transformation.github.io/what-is-ot.html
How does Operational Transformation work? Here’s the short overview:
insert(12, 'A')
and insert(0, 'B')
. If we would simply send B’s operation to client A and applied it there, there is no problem. But if we send A’s operation to B and apply it after B’s operation has been applied, the character ‘A’ would be inserted one character one position left from the correct position. Moreover, after these operations, A’s document state and B’s document state wouldn’t be the same. Therefore, A’s operation insert(12, 'A')
has to be transformed against B’s operation to take into account that B inserted a character before position 12 producing the operation insert(13, 'A')
. This new operation can be applied on client B after B’s operation.However, you don’t have to understand the details of Operational Transformation to use it with this library in your own project.
一个文档可以被抽象为一系列操作的集合,这个集合便是 changeset。
changeset 具有如下的特征:
对于 changeset,通常可以使用 json 的形式表示。
interface Action {
type: string;
// ...
}
interface Changeset {
version: number;
actions: Action[];
}
例如,下面的 changeset 是在协同表格的第 15 行后面添加一行,并删掉第 5 行。
{
"version": 0,
"actions": [
{
"type": "insertRowAfter",
"index": 15
},
{
"type": "deleteRow",
"index": 5
}
]
}
注意:
上面说到过,changeset 只有基于某个版本才是有意义的。
假设, 有一个 A 客户端和一个 B 客户端,他们在某时刻具有一样的文档 $X$, 这时,A 做了 $A$ 操作,B 做了 $B$ 操作,他们本地的文档看上去已经不再一样,这时,我们便需要进行 协同
我们可能会采用 merge 的思路。意思是,将 A 的操作和 B 的操作在服务端进行 merge,然后分别应用到 X 上,即
$X ← Xmerge(A, B)$
但是,这显然不可取,因为无论在 A 还是 B 端,都已经分别是 $XA$, $XB$ 了
我们采用 follow 算法
follow 具有如下特征
follow 的以上特性使其很适合作为协同编辑的运算单元。
定义 $V_n=C_1C_2...C_n$ 为第 $n$ 版本的服务端的文档。
假设服务端的数据库存储了形如 $V_0→V_1→V_2→V_3→...→V_m→ ...→V_H$ 版本信息的某文档,则若有某 changeset $C_m'(m<n)$ 的变更需要应用到该文档,显然,$C_m'$ 不能直接应用到 $V_H$(版本不兼容)。这时,我们根据 follow 的特性,容易想到使用 follow 来做变换。
由于
$$ V{m+1}=V_mC{m+1} $$
即
$$ V{m+1} follow(C{m+1},Cm') = V_m C{m+1} follow(C{m+1},C_m') = V_m C_m' follow(C_m',C{m+1}) $$
由此我们可以得到一个
$$ C{m+1}' = follow(C{m+1},C_m') $$
同理
$$ C{m+2}' = follow(C{m+2},follow(C_{m+1},C_m')) $$
重复以上过程,可以得到一个相对于 $C_H$ 的 $C_H'$。在实现的时候,可以使用数组的 reduce
来进行。
得到该 $V_H'$ 之后,这个 changeset 可以应用到最新的文档 $V_H$ 上,这样便可以完成此次编辑。
客户端负责收集新的变更,生成 changeset 并发送给服务端, 客户端因此需要 维护一些状态、存在一定的生命周期。
$A$: 本地最新的版本,类比服务端的 $V_H$
$X$: 发送给服务端的 changeset,但是还没有得到服务端的确认
$Y$: 用户做的变更生成的 changeset,但是还没有发送给服务端
容易知道,本地文档看上去的样子显然应该是 $V=AXY$
当收到服务端推送过来的 changeset $B$ 时,客户端应该
证明:
$$ A'X'Y'= ABfollow(B,X)follow(follow(X,B),Y) $$
$$ A'X'Y'= AXfollow(X,B)follow(follow(X,B),Y) $$
$$ A'X'Y'= AXYfollow(Y,follow(X,B)) $$
$$ A'X'Y'= AXYD $$
$$ A'X'Y'=VD $$
当发送出去一个 changeset 的后,等待服务端的 ACK。当收到 ACK 的时候
这里暂时只举例只有一台服务器的情况
服务端在数据库中维护一个形如 ${V_n} = V_0→V_1→V_2→V_3→...→V_m→ ...→V_H$ 版本信息列表
当有活跃用户进入这个文档时,读入内存中
当一个 changeset $C$ 从客户端发送过来的时候
interface Change {
type: string;
[k: string]: any;
}
type Changeset = {
baseVersion: number;
changes: Change[];
} | null;
type FollowFunc = (cs1: Changeset, cs2: Changeset) => Changeset;
定义 client:
定义 client 协同文档:
/// <reference path="common.d.ts" />
export = CoSyncClient;
interface DocumentLifecycle {}
declare class Document{
constructor(followFunc: FollowFunc, documentId: string);
connected?(): void;
reconnected?(ws: WebSocket): void;
connectionLost?(reason: string): void;
connectionClosed?(reason: string): void;
connectionRejected?(reason: string): void;
applyChanges?(changes: Change[]): void;
makeEmpty?(): void;
edit(change: Change): void;
leaveDocument(): Promise<void>;
enterDocument(): Promise<void>;
}
declare namespace CoSyncClient {
function createClient(
websocket: WebSocket
): {
Document: new (followFunc: FollowFunc, documentId: string) => Document;
getAllDocuments: () => Document[];
websocket: WebSocket;
};
}
定义 server:
onEnterRequest
的回调注册函数,该回调函数定义 server 协同文档:
/// <reference path="common.d.ts" />
import WebSocket from "ws";
declare class Document{
constructor(followFunc: FollowFunc, docuemntId: string);
getInitialDocument(): Promise<Set<Changeset>>;
saveChangeset(cs: Changeset): Promise<void>;
changesetWillBeHandled(cs: Changeset): void;
changesetWillBeAccepted(cs: Changeset): boolean | void;
changesetWillBeBroadcast(cs: Changeset): boolean | void;
private broadcast(cs: Changeset): void;
private sendInitialDocument(document: Set<Changeset>): void;
acceptEnter(): void;
rejectEnter(reason: string): void;
closeDocument(reason: string): void;
}
export = CoSyncServer;
declare namespace CoSyncServer {
function createServer(
websocket: WebSocket
): {
getAllDocuments: () => Document[];
websocket: WebSocket;
onEnterRequest: (
cb: (
websocket: WebSocket,
documentId: string,
Document: new (followFunc: FollowFunc) => Document) => void,
once: boolean
) => void;
};
}