核心技术
数据同步
Records
了解如何使用Records和数据同步的好处
Records是deepstream构建数据同步功能的主要模块。它们是可读写和观察的基本单元,由JSON数据构成。对Records的任何更改都会立即同步到所有连接的客户端。
使用Records
客户端用client.record.getRecord(name)访问record。如果指定的名称尚不存在会即时创建,如果您只想检查record是否存不想创建它,可以用client.record.has(name, callback)。
record对象有get()和set()方法与读写数据,并提供subscribe()订阅更新。如下所示:
// Client A: Hungry customer sees pizza delivery approach on map const driver = client.record.getRecord('driver/jack') driver.subscribe('coords', updateMapPointer) // Client B: Delivery driver's smartphone feeds position into record const driver = client.record.getRecord('driver/jack') navigator.geolocation.watchPosition(position => { driver.set('coords', position.coords) })
路径
get()、set()、subscribe()除了操作整个record的数据,而且还支持“路径”-- 您可以使用JSON语法访问record数据的子部分,例如pets[1].fur.color。如果操作涉及到不存在的路径,则会动态创建该路径。
Records的生命周期
调用client.record.getRecord(name),可能会发生以下三种情况之一:
- 如果客户端或服务器上尚不存在该record,则将创建之。新建record的初始数据是一个空对象{}。
- 如果该record存在于服务器上,但尚未在客户端上加载,则将其取回到客户端。
- 如果record已在客户端上加载,则返回其实例。
无论record是否加载,getRecord(name)都将立即返回一个record实例。此后您可以开始读写或订阅更新,但是get()调用可能会返回null。 要确保record完全加载,请使用whenReady()方法。请注意:当record已经可用时此方法将同步执行;如果record仍在加载则将异步执行。
丢弃record 要通知服务器您不再对record更新感兴趣,请调用discard()。
删除record 可以使用delete()删除record,删除附带丢弃操作。当客户端删除一条record时,其他客户端上的同名record都会产生一个delete事件。
取消订阅、放弃和删除-有什么区别?
- unsubscribe() 删除了对整个record或部分路径的订阅。这纯粹是客户端操作,不会通知服务器。
- discard() 会告诉服务器您不愿再更新record。
- delete() 不可逆地从数据库和缓存中删除record,并将删除通知给集群中的所有服务器和客户端。
获取快照
如果您只查看一次record数据而不关心整个生命周期,可以使用 client.record.snapshot(name, callback)获取数据快照,快照不会更新。
命名Records
每个record由一个名称标识,该名称在整个系统中必须是唯一的。那么一个好的record名称应该是什么样的呢:
const book = client.record.getRecord('book/iq6auu7d-p9i1vz3q0yi')
该名称由不同部分组成:
- book 是record的类别。
- / 被许多数据库连接器用作拆分字符,用以将record分类到表中。
- iq6auu7d-p9i1vz3q0yi 是唯一的ID。可以使用client.getUid()在客户端上生成唯一ID。它们由base64编码的毫秒时间戳和随机字符串组成。
UID是分布式系统中的通用概念,因为它们满足了对集中式增量ID的需求。
唯一ID不能冲突吗? 理论上是的:在完全相同的毫秒内生成两个相同id的可能性是10^16 分之1,这个级别的风险是可以接受的。
我可以使用更具描述性的record名称吗? 可以将任何字符串用作record名称,但您需要确定该字符串永远不会改变。有许多基于唯一ID的机构就用了更具描述性的名称。如果您要建立股票交易平台,则最好把Microsoft股票命名为stock/msft。
将用户名作为record名称的一部分 deepstream中的许多许可策略都基于Records、Events或RPCs名称以及它们包含的数据。例如,为确保只能johndoe更改他的设置,您可以把record命名为settings/johndoe并指定如下Valve规则:
record: settings/$username: write: "$username === user.id"
监听
Records支持称为“监听”的概念。每个客户端都可以针对Records名称注册侦听器,名称支持正则表达式,例如^settings/.*。当其他客户端订阅与名称模式匹配的Records时,就会触发侦听器。
// Client B client.record.listen('settings/.*', (match, response) => { console.log(match) // 'settings/security' if (/* if you want to provide */) { // start publishing to this record via `client.record.setData(match, data, ack)` response.accept()
response.onStop(() => {
// stop publishing to this record when no one is interested
})
} else { response.reject() // let deepstream ask another provider } })
这对于创建“主动数据提供者”很有用 -- 后端进程仅发送当前有人关心的数据。关于监听需要知道:
- listen回调在第一次订阅时进行,onStop回调在最后一个取消订阅时进行。
- 监听会保留状态。如果某record已有多个订阅,对该record监听时将立即多次回调listen -- 每个(没有提供者的)订阅一次。
- record有一个hasProvider属性,可让您查看对该record是否有监听服务。利用这点可以验证数据是不是陈旧的。
匿名Records
了解如何使用匿名Records来简化动态选项
如果您已阅读Records,那么您就会知道它们是可以监视和操作的小数据对象。使您可以使用.set()来存储数据,通过.get()来读取数据,并使用subscribe()来监听数据更改。
匿名Records完全一样。
什么是匿名Records?
Records与匿名Records之间的唯一区别是,Records具有唯一的名称而但匿名Records则没有。相反,他们有setName(id)方法可以让您更改其名称。 从概念上讲,匿名Records就像包裹其他Records的外壳。监听器可以绑定到该外壳,并在内部Records更改时保持原样。 类比笔记本电脑的底座:底座与屏幕、键盘和电源插头等连接,您可以更换插在底座上的笔记本电脑。匿名Records的工作方式几乎和底座相同。
匿名Records有什么用?
如果要用接口处理不同Records的相似部分,则匿名Records会派上用场。以下面应用为例:
上图中,左侧每个辛普森都是record,它们的名称都存储在一个列表中。右侧带有输入字段的部分由单个匿名record提供数据,所有输入字段都双向绑定了该匿名record的某条路径。
现在,当用户选择某个“辛普森”时,匿名record会用setName(id)方法把该辛普森的名称设置进去。匿名record内部删除所有对旧record的订阅然后切换到新record重新订阅,通知所有监听者更新用户界面。
如何创建匿名Records?
客户端调用client.record.getAnonymousRecord()创建匿名Records,该方法不带任何参数。
还有几件事值得一提:
- 调用匿名record的方法,例如delete(),discard(),get()或者set()都会被代理到内部的record上。
- 匿名record调用setName()并准备好新的内部record后,会发出一个ready事件。
- 匿名record调用setName()时会立刻发出一个nameChanged事件。
列表
了解如何使用列表给具有共同属性的Records创建集合
列表是record名称的可观察数组(不是数据的数组!)。
列表和record名称之间是多对多关系 -- 一个列表可以包含许多record名称,而一个record名称可以是许多列表的一部分。
为了使生活更轻松,列表附带了各种便利方法。您可以使用addEntry(recordName, index)添加条目到指定位置,也可以使用removeEntry(recordName)从列表中的任何位置删除条目或用isEmpty()检查列表是否为空。除此之外,列表与record非常相似。他们会在whenReady()时通知您,可以subscribe()并且使用后需要discard()。
列表对什么有用?
需要以集合管理Records时使用列表。让我们以TodoMVC为例。当使用deepstream构建时,每个任务是一条record,其中包含title和completed标志。
每个record由唯一名称标识,例如todo/ikfndiqx-43jdj23bsdf。
record名称的todo/部分标识了它所属的类别,并指明了在数据库中的存储表。它不会自动添加到待办事项列表中,要将任务添加到名为todos的列表中,我们需要显式创建任务:
const todos = ds.record.getList( 'todos' );
并添加我们的record名称作为条目
todos.setEntries([ 'todo/ikfndidw-1973pnhmyk7', 'todo/ikfndiqx-43jdj23bsdf', 'todo/ikfndidt-5sdk3zag354' ]);
更进一步
通过在record中嵌套列表和把Records组合成列表,可以把应用数据建模为能同步且可观察的树结构。
列表和匿名Records
列表可以增强匿名Records选择面板的功能。
为什么列表不包含或订阅Records的数据?
Records不仅仅是数据。它们具有自己的订阅/注销生命周期,该生命周期往往与呈现它们的组件紧密相关。所以,此组件是请求Records和管理其生命周期的最佳位置。
Records名称是轻量级的字符串,可以轻松传递,例如React组件中的props或Android列表视图中的data-model。
用名称则解决了开发实时应用程序的主要挑战之一:有效地使用带宽并最小化线上数据量。实现此目的的最佳方法之一是限制当前视图中订阅的Records。列表可以提供必要的数据结构创建无限的网格或面板,并在用户滚动时自动加载或丢弃数据。
处理数据冲突
本节说明了如何在deepstream中合并数据冲突
当两个或多个客户端恰好同时写入同一条record时,可能要合并冲突。
deepstream如何跟踪数据一致性?
deepstream使用递增的版本号来确保Records按顺序进行更改,并且不会丢失任何中间更新。调用set()产生的每条消息都包含客户端希望设置的Records版本号。
服务器将确保更新时传入的版本号比当前版本号高一个。如果是,则更新成功并发布到订阅的客户端;如果不是,则会发生以下两种情况之一:
- 如果传入版本与现有版本相同,则deepstream将假定存在写入冲突。它将保留当前版本,并尝试发送VERSION_EXISTS错误到客户端。在客户端上,这将调用一个MERGE_STRATEGY函数。
- 如果传入版本低于或高于当前版本1以上。。。。
- 如果版本不同步,则服务器将尝试对帐。。。。
处理冲突
合并冲突由MERGE_STRATEGY函数处理。这些函数由deepstream对象公开,并且可以在初始化客户端时按名称模式或对每个record进行全局设置。
// Set merge strategy globally when initialising the client client = deepstream('localhost', { mergeStrategy: deepstream.MERGE_STRATEGIES.LOCAL_WINS })
// Set merge strategy on a pattern when initialising the client client = deepstream('localhost', { mergeStrategy: deepstream.MERGE_STRATEGIES.LOCAL_WINS }) client.record.setMergeStrategyRegExp('name', (localValue, localVersion, remoteValue, remoteVersion, callback) => { callback(error, mergedData) })
// Set merge strategy on a per record basis rec = ds.record.getRecord('some-record') rec.setMergeStrategy(deepstream.MERGE_STRATEGIES.REMOTE_WINS)
默认情况下,LOCAL_WINS和REMOTE_WINS都可以用。也可以实现自定义合并策略,例如
// Accept remote title, but keep local content rec.setMergeStrategy(( record, remoteData, remoteVersion, callback ) => { callback( null, { title: remoteData.title, content: record.get( 'content' ) }); });
避免合并冲突
您的Records结构越精细,可能的冲突就越少。通常,与处理一些非常大的Records相比,deepstream更擅长处理大量的小Records。特别是当涉及到较高的更新频率时,例如拍卖网站上的物品价格迅速更新,就有必要将上游的record(例如客户提交的出价)单独建模或将其建模为event或RPC。
数据存储
了解Deepstream如何使用缓存和存储系统存储数据
deepstream如何存储数据
作为独立服务器,deepstream将其所有数据保留在内部存储器中。但在生产集群中deepstream服务器本身不会保存任何数据,取而代之的是:数据存储在存储层和缓存层的组合中,群集中的所有deepstream节点均可访问这些数据。这允许各个服务器保持无状态运行,并且可以在不造成任何数据丢失的情况下进行关机下线/故障转移。但也允许将数据分布在多个节点上。
deepstream可以同时连接到缓存和数据库。每当需要存储值时,就以阻塞方式将其写入高速缓存,同时以非阻塞方式将其写入存储系统。
同样,每当需要检索条目时,deepstream都会首先在缓存中查找,然后再在存储系统中查找。如果只能在存储系统中找到该值,则deepstream会将其加载到缓存中以加快访问速度。
为什么要区分缓存和存储系统呢?
因为它们相得益彰!
- 缓存 使相对少量的数据可以高速访问。他们通常通过将数据存储在内存中而不是磁盘上来实现此目的(尽管有些也写入磁盘,例如Redis)。这意味着该进程退出时,所有数据都将丢失。缓存通常也不提供对复杂查询的支持(尽管也有一些支持,例如Hazelcast)。
- 存储系统 (数据库)读取或写入速度较慢,但可为大量数据提供有效的长期存储,并允许更精细的查询方式(例如,全文搜索,SQL联接等)。
为什么deepstream不自己存储数据?
有时会将Deepstream与诸如Firebase(现在已成为Google Cloud Platform的一部分)之类的项目进行比较 -- Firebase是允许用户创建流式请求的实时数据库。
尽管Deepstream非常适合类似的场景,但在概念上却大不相同:Deepstream的设计理念是受多人游戏服务器或财务流工作方式的启发,而不是数据存储。
它可以用作没有任何数据层的独立服务器,但当同时连接到缓存和数据库时,也可以满足对存储需求苛刻的大规模协作应用程序的使用。
最后,我们牢记了从Meteor这样的固定栈框架中学到的经验。deepstream是一种快速且通用的实时服务器 -- 但它并没有尝试更多。
我们相信,网络技术正在从过去的单企业技术栈过渡到充满活力的高度专业化微服务生态系统 -- 我们的目标是使其可与多种编程语言、前端框架、数据库、缓存、log系统、身份管理系统、部署环境一起使用,并在该生态系统中蓬勃发展。
连接到缓存或数据库
deepstream使用不同类型的“连接器”连接缓存和存储系统,连接器是deepstream与其他系统的接口插件。
缓存连接器是将深deepstream连接到内存缓存的插件,例如Redis,Memcached,IronCache,Hazelcast或Aerospike。
存储系统连接器是将deepstream连接到数据库的插件,例如MongoDB,CouchDB,Cassandra或Amazon的DynamoDB。
选择数据库和缓存
在选择数据库和缓存时,需要考虑以下几点:
- 选择可以在云环境中扩展的系统 deepstream可以跨云中进行水平扩展,但如果您的数据库或缓存不能跨云扩展,那将无济于事。幸运的是,大多数流行的解决方案都提供了某种形式的分片、集群或云复制,但并非所有云服务都支持。例如,如果您使用的是AWS,并考虑将ElastiCache和Redis用作缓存引擎,则您的部署就仅限于一台计算机。
- 选择相辅相成的系统 一些缓存(例如Redis)将数据存储到磁盘,并且可以在没有数据库的情况下使用。诸如Hazelcast或Redis之类的某些系统同时提供缓存和发布/订阅机制,从而无需单独的消息总线。一些纯内存缓存(例如Memcached)非常快,但是需要有数据库支持持久化。某些数据库提供非常快的读取性能和内置的缓存层,因此可以独立使用(但请确保将它们注册为缓存,因为否则deepstream可能会使用其内部缓存并使用其中的陈旧数据)。
- 对象/文档/NoSQL数据库比关系数据库更有意义 deepstream的数据结构是小的独立JSON块,由集合中唯一的标识组织在一起。这使得它们非常适合面向对象或文档的数据库,例如Mongo,Rethink或Couch。deepstream还可以与关系数据库(例如MySQL,PostGre或Oracle)一起使用,但无法利用其数据建模能力。
- 使用外部x-as-a-service提供商时要小心 来自外部供应商完全托管的as-a-service缓存可能很诱人,但请注意其托管在数据中心之外而deepstream内部需要持续与缓存交互。缓存和网络延迟中的每毫秒都会减慢您应用程序的速度。同样,许多缓存/数据库协议被设计为在受信任的环境中使用,因此未加密。如果数据库不在内网中,请确保使用TLS或选择带有VPN的服务。
下载并安装连接器
deepstream连接器可用于许多流行的数据库和缓存,并且我们一直在寻求扩大选择范围。您可以在下载页面上找到可用连接器的概述。可以通过deepstream的命令行界面,使用cache或storage关键字安装连接器,例如
deepstream install cache redis deepstream install cache memcached deepstream install storage mongodb deepstream install storage rethinkdb
or on windows using deepstream.exe
deepstream.exe install storage rethinkdb
每个连接器都需要特定的配置参数。在deepstream的config.yml文件中配置它们(可以在deepstream的conf目录中或的linux中找到/etc/deepstream/)。安装连接器时命令行会显示其常用配置。
缓存连接器在配置文件的plugins - cache部分,数据库连接器在配置文件的plugins - storage部分配置
如果您正通过Node.js使用deepstream,还可以从NPM下载连接器。所有连接器均遵循命名约定deepstream.io-type-name,例如deepstream.io-storage-rethinkdb。
编写自己的连接器
如果找不到适合您系统的连接器,还可以使用带有Node绑定的C++或Node.js轻松编写自己的连接器。如果您对连接器的思想感到满意,请考虑做出贡献。为此,请查看Deepstream的贡献准则。 有关如何实现自己的连接的指南,请参阅存储插件和缓存插件指南。
监听(Listening)
了解deepstream中的监听机制
什么是监听
监听是Deepstream独家提供的强大功能!但是,在我们完全了解监听的工作原理之前,我们首先需要了解什么是数据提供者。
数据提供者是Deepstream的客户端,它们将数据写入Records,发送Events或提供RPCs。通常,这些提供者倾向于广播他们拥有的所有数据。在大多数情况下,这意味着它们提供了很多不必要的数据,甚至是任何客户端都不需要的数据。
Deepstream的监听功能通过让数据提供者仅提供其他客户端感兴趣的数据来解决此问题,“感兴趣的数据”是通过订阅来指定的。
通过监听,这些数据提供者可以关注特定的名称模式,并且仅在有客户端需要该模式引用的数据时才发送数据。
如果有多个数据提供者与数据名称模式匹配,则Deepstream会随机选择其中之一。但数据提供者可以出于各种原因(包括负载平衡)选择拒绝请求,在这种情况下,deepstream将检查是否有其他数据提供者模式匹配并要求其提供数据。本教程后面有单独部分对此进行进一步描述。
为了更好地理解它,让我们看下面的示例:
从上面的示例中可以明显看出,让天气提供商为所有国家/地区(甚至客户不感兴趣的国家/地区)发送更新会增加成本以及冗余数据流。这就是监听应用的地方。
如上所示,监听可以将数据提供者发送的数据量有效地减少到客户端所需的数量。这种改进减少了您的发布/订阅基础架构的消息数,从而也降低了您的成本。 请注意,通过监听,当第一个客户端订阅时数据提供者开始提供数据,直到最后一个客户端取消订阅时数据提供者才停止。
监听Events
为了实现监听Events,我们首先让数据提供者(一般是位于后端的deepstream客户端)监听特定的模式,就像上面示例的weather/*一样。
client.event.listen('weather/*', onMatch)
onMatch回调在两种情况下触发:1.新的客户端订阅event 2.最后一个订阅的客户端unsubscribe。考虑以下代码:
// server.js
const { DeepstreamClient } = require('@deepstream/client')
const client = deepstream('
client.login({}, (success, data) => { if (success) { startApp() } else { console.log('ds login failed') } })
function startApp(){ client.event.listen('weather/germany/*', onMatch) }
let interval
function onMatch(subject, response) { response.accept() // optionally add a condition to // reject a request with response.reject() interval = setInterval(()=> { client.event.emit(subject, "here's your weather data") }, 2000)
response.onStop(() => { // if your event is being continously emmitted // stop emitting it here clearInterval(interval) }) }
在onMatch的入参中,
- subject 是完整路径,即客户端已订阅的event名称,在这种情况下,它将以weather/germany/开头。
- response 是具有两个方法response.accept()和response.reject()的对象。您可以根据各种条件(例如,当前加载了多少数据提供者)来使用其中一种。
现在,让我们看看客户端如何订阅上述数据提供者正在监听的事件:
// client.js
const client = deepstream('
client.login(() => { client.event.subscribe('weather/germany/berlin', (data) => { // handle weather data }) setTimeout(() => { // unsubscribing after 10 sec for the sake of // simplicity of this tutorial client.event.unsubscribe('weather/germany/berlin') }, 10000) })
当您执行此应用程序时,将发生以下情况:
- 数据提供者程序监听 weather/germany/*
- 客户端将使用订阅柏林的天气 weather/germany/berlin
- 由于此event的数据提供者一直在监听,因此将调用onMatch回调函数
- 在onMatch函数内部,由于客户端订阅event,因此数据提供者将接受请求并每两秒钟开始发出该event
- 一旦客户端取消订阅事件(10秒钟超时后),则将调用onStop回调(在onMatch函数中定义),将停止发出该event。
监听Records
对Records监听很容易,但是如果您不完全了解Records的工作原理,它很快就会变得复杂。因此,您可能需要先浏览Records文档。
让我们以一个小例子来理解,例子中使用纳斯达克股票市场网站上的数据作为我们的数据源。为简单起见,我们将跳过后端数据提供程序如何连接到Nasdaq数据库的细节。
我们的数据提供者将如下所示:
// server.js
const { DeepstreamClient } = require('@deepstream/client')
const client = deepstream('
client.login({}, (success, data) => { if (success) { startApp() } else { console.log('ds login failed') } })
function startApp(){ client.record.listen('nasdaq/*', onMatch) }
let interval
function onMatch(subject, response) { response.accept() // optionally do response.reject() based on some condition interval = setInterval(() => { client.record.setData(subject, { price: /* price from Nasdaq stream */ }) })
response.onStop(() => { console.log('stopped publishing data') clearInterval(interval) }) }
这里的逻辑与我们在监听Events的上一个示例中的完全相同。 每当客户端订阅以nasdaq/*开头的record时,就会触发onMatch回调。当最后一个客户端对该record.unsubsribed时onStop回调将被执行。
让我们看看客户端代码的样子:
// client.js
const client = deepstream('
监听Records时可以使用一个名为hasProvider的附加功能。此标志告诉客户端订阅的record是否有正在监听的数据提供者(主动数据提供者)。 另外,客户端也可以使用hasProviderChanged事件来获得数据提供者状态改变的通知。
用record的hasProvider属性和hasProviderChanged事件可以让客户端知道数据是实时获取自数据提供者还是来自曾经存储的旧数据,这点很重要,因为这是获知数据来源的唯一方法。
请注意,hasProvider和hasProviderChanged仅Records可用。
监听列表数据
列表是record名称的集合(不是其实际数据)。要了解有关使用列表的更多信息,请参阅“列表”。
监听列表类似于我们在上文中看到的监听Records。 下面是用于监听列表的示例代码,这很容易解释。
// server.js
const { DeepstreamClient } = require('@deepstream/client')
const client = deepstream('
client.login({}, (success, data) => { if (success) { client.record.listen('cars/*', onMatch) } else { console.log('ds login failed') } })
function onMatch(subject response) { response.accept() // optionally handle response.reject() // handle list subsribe'
response.onStop(() => {
// handle list discard
})
}
// client.js
const client = deepstream('
client.login({}, (success, data) => { const cars = client.record.getList('cars/honda') cars.subscribe((entries) => { // handle list entries changed }) setTimeout(() => { cars.discard() }, 5000) })
把监听用于负载均衡
如上所述,监听是在数据提供者之间实现负载均衡的好方法。让我们了解一下它是如何工作的。
假设您有多个数据提供者,它们能够为世界上所有国家/地区提供天气数据。现在考虑最坏的情况,连续6次,deepstream服务器随机选择的数据提供者恰好都是相同的。这将给单个数据提供者带来很多负担,而其他所有数据提供者都处于空闲状态。我们可以通过两种方式解决这种情况:
- 如果数据提供者已经负担很重,则使其拒绝数据请求,然后deepstream服务器随机选择下一个数据提供者。或者
- 您可以使这些个数据提供者仅监听国家/地区的不相交子集,例如按其名称的字母顺序进行划分。
把上面两种方法组合起来,即对每个子集设置多个数据提供者选项,并当某个数据提供者负担过重时拒绝请求,可以进一步提高效率。
给监听设置权限
与其他所有内容一样,此功能也涉及安全方面。您可以通过listen:true/false在应用程序的“权限”部分中控制监听Records和Events。
如果数据提供者尝试监听没有权限的record或event,则会收到一条错误消息,提示message denied。
要了解有关权限如何在deepstream中工作的更多信息,请访问Valve页面。
哪些场合适用监听
尽管监听听起来已经很妙了,但大多数开发人员仍未意识到它可以满足的各种场景。
你可以:
- 在访问数据库前监听
- 使用GPS接收地理位置坐标时监听
- 用监听在物联网中实时控制传感器
监听允许数据提供者在客户端订阅数据时开始提供数据。因此数据提供者本身不该订阅该数据,否则将使整个应用程序循环工作。 因此,deepstream客户端API提供了setData功能(client.record.setData),允许在不订阅的情况下写record。
总结
监听是一项复杂的功能但可以在您的deepstream应用中轻松实现。应尝试用监听降低流量成本并提高后端进程的效率。
Events
了解如何使用Events来实现发布/订阅和分离逻辑
Events是deepstream对“发布/订阅”或“观察者”模式的实现。如果您熟悉JavaScript事件发射器或Java事件,则知道它们的工作原理。零个或多个感兴趣的客户端订阅事件(有时也称为“主题”,“频道”或“命名空间”),零个或多个其他客户端发布给该事件。
“零个或多个”强调了发布/订阅的主要特征之一:发布者和订阅者是完全分离的。有点像报纸,记者在有假想阅读者的情况下写文章,而读者打开报纸期望但不确定自己对文章感兴趣。
这种解耦使pub/sub具有可伸缩性和容错能力,但是有时您想知道是否有人在等您的消息。为此,deepstream的Events具有称为“监听”(Listening)的功能。
发布/订阅及其限制
由于pub/sub模式简单且通用,许多实时系统包括独立运行的(例如Pusher,PubNub)、服务器/客户端结构的(例如socket.io,SocketCluster)、服务器到服务器式的(例如,Redis,Kafka)解决方案都使用Pub/Sub模型。
但是,Pub/Sub纯粹是一种轻量级的消息传递方式,并没有任何持久性或状态的概念。因此,它通常用于通知客户端更改,进而触发单独的HTTP请求以检索实际数据。这带来了巨大的开销,并且逐渐被“数据同步”所取代,该方法是在所有订阅的客户端之间分发实际数据并保持同步。数据同步是Deepstream的核心功能之一,以Records的形式使用。
话虽这么说,但发布/订阅与数据同步并不一定非要二选一。两者相得益彰,可以一起用于许多场景。
如何使用Events
可以使用client.event.subscribe建立event订阅,也可以使用client.event.unsubscribe删除event订阅。
// Client A function eventCallback(data) { //callback for incoming events }
//Subscribing to an event client.event.subscribe('news/sports', eventCallback)
//Removing specific callback to the event client.event.unsubscribe('news/sports', eventCallback)
//Removing all subscriptions to the event client.event.unsubscribe('news/sports')
可以使用client.event.emit(eventName, data)发布event。可以与event一起发送任何类型的可序列化数据,例如字符串,JSON对象,数字,布尔值等。
// Client B client.event.emit('news/sports', 'football is happening')
如何监听Events订阅
deepstream允许客户端“监听”其他客户端的Events订阅。这对于创建仅在实际需要时才发出Events的“主动数据提供者”很有用。
侦听器可以注册由正则表达式描述的模式,例如'^news/.*'。
// Client B client.event.listen('^news/.*', (match, response) => { console.log(match) // 'news/sports' if (/* if you want to provide */) { // start publishing data via `client.event.emit(eventName, /* data */)` response.accept()
response.onStop(() => {
// stop publishing data
})
} else { response.reject() // let deepstream ask another provider } })
每当有客户端订阅了匹配的event时回调会被触发,但onStop只有在最后一个客户端unsubscribe时才会回调。 监听也会保留状态信息。如果已经有多个客户端订阅event但还没有主动数据提供者,那么注册侦听器时会立即多次进入回调,每个匹配的订阅一次。
RPCs
了解如何在请求/响应模型中使用RPCs
deepstream用远程过程调用(RPC)实现了请求/响应通信机制(可类比Ajax Request,但增加了负载平衡和重新路由等)。
RPC本身可以代替经典的HTTP工作流。当与其他deepstream概念(如pub/sub或data-sync)结合使用时,RPC尤其有用。
RPC的一些重要用途
- 查询数据库 如果你把deepstream和一个数据库或搜索引擎例如ElasticSearch配合使用,RPC的可用于从中查询数据。
- 与RESTfulAPI交互 需要从OpenWeatherMap检索、获取Github Repo的提交历史、或从SQL查询数据吗?创建一个将传入的RPC作为HTTP请求转发并返回结果的进程。
- 安全地组合多步record操作 如果您正在构建实时投票系统,则可能需要增加投票数的同时将用户标记为已投票。可以为此使用RPC。
- 分配计算负荷 需要运行耗时的任务?可以将您的数据分成几部分,然后让deepstream在RPC提供者之间分配负载。
- 提示用户输入 是否需要询问某个用户?RPC也可以在客户端上实现问题-解答工作流。
使用RPC
让我们看一个例子:将两个数字相加(当然,这不像您在后端要执行的操作,但让事情简单点)。
每个RPC都有唯一的名称标识。对于我们的示例,我们将选择'add-two-numbers'。首先,进程需要用client.rpc.provide()注册自己为“RPC提供者”-- 能满足RPC请求的东西。对相同的RPC,每个客户端只能注册一次。
client.rpc.provide('add-two-numbers', (data, response) => { response.send( data.numA + data.numB ); })
现在,任何客户端都可以用client.rpc.make()调用远程方法。
// ES5 client.rpc.make( 'add-two-numbers', { numA: 7, numB: 13 }, (err, result) => { // result == 20; })
// ES6 try { const result = await client.rpc.make('add-two-numbers', { numA: 7, numB: 13 }) } catch (e) { // Possibly an error on RPC provider, a timeout or a missing RPC provider }
用unprovide注销“RPC提供者”。
client.rpc.unprovide('add-two-numbers')
RPC路由
一个进程可以注册为多个RPC提供者,并且多进程可以提供相同的RPC。deepstream将尝试尽可能高效地路由客户端的请求,并在可用的提供者进程之间平衡传入负载。
RPC提供者也可以使用response.reject()拒绝请求(例如,它们承受了过重的负载),这将提示deepstream将请求重新路由到另一个可用的提供者进程。
//Limiting to 50 simultaneous tasks at a time let inProgress = 0; client.rpc.provide('task', async (data, response) => { inProgress++;
if( inProgress > 50 ) {
response.reject()
} else {
const result = await doTask(url)
inProgress--;
response.send(result)
}
})
Presence
Presence使我们可以查询已连接的经过身份验证的客户端。
deepstream的Presence功能允许您应用程序中的用户跟踪其他用户的在线或离线状态。
可以要求Presence向您发送当前登录的所有人员的列表,或者在其他(已通过身份验证)客户端的登录状态发生更改时向您发送通知。存在具有以下三种方法:
getAll(),subscribe()和unsubscribe()
getAll
使用getAll()方法,我们可以立刻获取所有连接的客户端的状态。这可以通过两种方式完成!
- 如果您想在您的应用程序中查看所有在线用户,则可以执行以下操作:
client.presence.getAll((error, usernames) => { // error = null, usernames = ['Homer', 'Marge', 'Lisa'] })
上面的函数将返回当前登录到您的应用程序的所有用户。
- 您也可以通过执行以下操作仅检索选定用户的在线/离线状态:
client.presence.getAll(['Homer', 'Marge', 'Lisa'], (error, result) => { /* error = null, clients = { 'Homer': true, 'Marge': true, 'Lisa': false } */ })
subscribe
您可以使用subscribe()方法订阅客户端登录和注销。简单来说,当您使用presence.subscribe后只要现有客户端注销或有新客户端登录,您都会收到一条通知:
- 如果您希望观察所有客户的登录/注销活动,则可以执行以下操作:
function presenceCallback(username, login) { if (login === true) { // handle login } else { // handle logout } } // Client A client.presence.subscribe(presenceCallback)
- 如果您只希望观察特定客户端的登录/注销活动,则可以通过在subscribe函数中增加用户名如下所示:
// Client A client.presence.subscribe('Marge', presenceCallback)
如果用户在多个设备上登录,则只有第一次登录和最后一次注销会通过Presence通知。因此,如果您登录多个设备并注销其中一个,则您的状态将保持登录状态,直到您注销最后一个设备/浏览器。
unsubscribe
顾名思义,如果您已经订阅了Prescence,但不再希望继续接收通知,则只需使用unsubscribe()方法。
这是一个例子:
// Client A client.presence.unsubscribe(presenceCallback)
Prescence很重要,因为它是了解用户连接状态并进行后续查询的唯一方法,这在一些场景是必须的。