巧用zookeeper实现分布式并行计算

云计算的技术话题中少不了“分布式”,“并行计算” 这些个关键词,我们知道硬件扩展的条件(Scale-up)始终是有限制的,将计算分散到网络中更多机器的CPU上提供更高的计算性能(Scale-out),并在这基础上能将计算同时进行,那么总体计算瓶颈会减小,计算的性能会显著提高,也就是说将串行计算变为并行计算,将大量的计算在同一时间发生,,将任务分配到每一个处理器上。这里面需要一个重要的角色,分布式计算资源中的协调者。

有这样一个场景:系统中有大约100w的用户,每个用户平均有3个邮箱账号,每隔5分钟,每个邮箱账需要收取100封邮件,最多3亿份邮件需要下载到服务器中(不含附件和正文)。用20台机器划分计算的压力,从多个不同的网路出口进行访问外网,计算的压力得到缓解,那么每台机器的计算压力也不会很大了。

通过我们的讨论和以往的经验判断在这场景中可以实现并行计算,但我们还期望能对并行计算的节点进行动态的添加/删除,做到在线更新并行计算的数目并且不会影响计算单元中的其他计算节点,但是有4个问题需要解决,否则会出现一些严重的问题:
1.20台机器同时工作时,有一台机器down掉了,其他机器怎么进行接管计算任务,否则有些用户的业务不会被处理,造成用户服务终断。
2.随着用户数量增加,添加机器是可以解决计算的瓶颈,但需要重启所有计算节点,如果需要,那么将会造成整个系统的不可用。
3.用户数量增加或者减少,计算节点中的机器会出现有的机器资源使用率繁忙,有的却空闲,因为计算节点不知道彼此的运行负载状态。
4.怎么去通知每个节点彼此的负载状态,怎么保证通知每个计算节点方式的可靠性和实时性。

先不说那么多专业名词,白话来说我们需要的是:1记录状态,2事件通知 ,3可靠稳定的中央调度器,4易上手、管理简单。
采用Zookeeper完全可以解决我们的问题,分布式计算中的协调员,观察者,分布式锁  都可以作为zookeeper的关键词,在系统中利用Zookeeper来处理事件通知,队列,优先队列,锁,共享锁等功能,利用这些特色在分布式计算中发挥重要的作用。

zookeeper的服务器端是采用Java编写,而zookeeper的客户端不仅可以支持java还可以支持C语言的客户端,在zookeeper服务端可以创建一个树状的Key/Vaule 存在着父子节点之间的关系。

Zookeeper允许多个Client对一个或多个ZNode数据进行监控,当ZNode有变化时能够通知到监控这个ZNode的各个Client,所有监听这个节点的成员都会知道了,Zookeeper使用Watcher察觉事件信息,当客户端接收到事件信息,比如连接超时,节点数据改变,子节点改变,可以调用相应的行为来处理业务逻辑。相反,如果zookeeper客户端对服务端的znode不关注,不Watcher,那么发生任何变化zookeeper的客户端都不会收到事件通知。

zookeeper中znode的数据模型
data model

每次zookeeper客户端与服务器端连接后都会创建一个session ID 给客户端,客户端将会定期心跳协议到服务器端验证这个连接的有效性。如果由于某种原因,客户端无法发送心跳到服务器,将导致服务器验证过期的会话,会话ID将变为无效。客户持有的连接/对象将不可用,因此应用程序必须创建一个新的客户对象。如果zookeeper客户端连接到服务器没有任何响应,首先客户端会作抛出的异常并且被捕获,清除当前与Server相关的网络资源和连接会话,然后客户端逐个尝试配置列表中Server的连接地址,选择可用的服务器继续进行工作。

上述Zookeeper客户端和服务器端的关系又是一个典型的“观察者”模式,客户端关注自己关心的对象(znode),一旦发送变化就立刻通知。在《Head First设计模式》中有这样的一张图来表达 观察者模式的。
“观察者”模式
如图所示,此系统中的三个部分是气象站(获取实际气象数据的物理装置)、WeatherData对象(追踪来自气象站的数据,并更新布告板)和布告板,再来看看百度百科对“观察者”模式的解释:http://baike.baidu.com/view/1854779.htm

通过对Zookeeper的了解,实现我们系统中需要的Failure detection和Load detection 功能,只需要在每个计算的节点中实现zookeeper客户端程序,计算节点关注zookeeper服务器上znode节点变化。可以在zookeeper服务器的znode上创建一个根节点/clusterA,下面根据计算节点机器名创建对应的子节点,子节点中的value就是这台计算节点的ip地址。
如:/clusterA/node1,/clusterA/node2,/clusterA/nodeN,这些节点都是临时节点(EPHEMERAL),一旦连接断开,创建的节点自动会被删除,关注/clusterA这个根节点/clusterA的机器都会知道现在哪台机器离开计算单元了,并且获知现在有多少个计算节点在这个计算单元中。
如果有新的计算节点添加,在程序运行的第一步将会到zookeeper服务器上的/clusterA 的znode上创建一个子节点/clusterA/nodeZ,这样关注 /clusterA这个znode的机器都会知道现在多了一个计算节点。
通过zookeeper客户端API中的getChildren()方法对应的数据类型是java.util.List,其返回/clusterA下面的机器列表,这样还能判断出自己在这个列表中排行位置,通过列表中排行位置可以对应用户列表中的数目,这样就知道自己去获得需要计算总数中的几分之分。
例如:有100w用户,20个节点时,每个节点处理5w用户进行同时计算,node3计算节点承载用户总数中10w-15w用户之间的计算压力,有200w用户,20个节点时,每个节点处理10w个用户的业务进行同时计算,node3计算节点承载用户总数中30w-40w用户的计算压力,以此类推。
这样一来无论计算节点的数目发生变化还是,需要计算的数目发生变化,都可以保证计算压力的平均分载。

我的废话:
1.根据节点数对应用户数算出百分比之后进行计算分载,貌似我们通常的分页查询,只不过将每页的分页结果同时显示在N多个显示器上输出,希望这样比喻能让您更好的理解。
2.在计算的中间有新的用户数量增加,将会通知每个计算节点 下次轮询时需要重新统计用户数量,因为用户所有用户的数据分块拿走以后放入本地的静态hashmap(缓存),没有发生变化就从本地加载,操作数据库发生变化后,通知zookeeper的znode节点 每个计算节点重新从数据库中加载一次。
3.在并行计算中时间同步也是一个需要注意的地方,如果每台机器上的时间不一致会导致潜在的隐患,可以找些工具通过时间服务器同步每台机器上的当前时间和时区。
4.使用zookeeper对计算节点的状态管理只是zookeeper实现的一部分,zookeeper还可以对外提供分组,配置管理,命名空间等服务等,这里只是做了一个抛砖引玉的作用。

对于zookeeper的可靠性和性能而言,有足够的机器那么稳定性就会越高,但是性能会降低,因为ZooKeeper在运行时全部的数据都会加载到内存中,集群中每一台服务器都包含全量的数据,每个节点实时保持数据的同步。因此整个集群中Follower数量越多,整个集群写入的性能越差。后来zookeeper Server为了避免这个问题,可以将ZooKeeper集群中部分服务器指定为Observer。