徒手教你使用zookeeper编写服务发现

zookeeper是一个强一致【不严格】的分布式数据库,由多个节点共同组成一个分布式集群,挂掉任意一个节点,数据库仍然可以正常工作,客户端无感知故障切换。客户端向任意一个节点写入数据,其它节点可以立即看到最新的数据。

zookeeper的内部是一个key/value存储引擎,key是以树状的形式构成了一个多级的层次结构,每一个节点既可以存储数据,又可以作为一个目录存放下一级子节点。

zookeeper提供了创建/修改/删除节点的api,如果父节点没有创建,字节点会创建失败。如果父节点还有子节点,父节点不可以被删除。

zookeeper和客户端之间以socket形式进行双向通讯,客户端可以主动调用服务器提供的api,服务器可以主动向客户端推送事件。有多种事件可以watch,比如节点的增删改,子节点的增删改,会话状态变更等。

zookeeper的事件有传递机制,字节点的增删改触发的事件会向上层依次传播,所有的父节点都可以收到字节点的数据变更事件,所以层次太深/子节点太多会给服务器的事件系统带来压力,节点分配要做好周密的规划。

zookeeper满足了CAP定理的分区容忍性P和强一致性C,牺牲了高性能A【可用性蕴含性能】。zookeeper的存储能力是有限的,当节点层次太深/子节点太多/节点数据太大,都会影响数据库的稳定性。所以zookeeper不是一个用来做高并发高性能的数据库,zookeeper一般只用来存储配置信息。

zookeeper的读性能随着节点数量的提升能不断增加,但是写性能会随着节点数量的增加而降低,所以节点的数量不宜太多,一般配置成3个或者5个就可以了。

图中可以看出当服务器节点增多时,复杂度会随之提升。因为每个节点和其它节点之间要进行p2p的连接。3个节点可以容忍挂掉1个节点,5个节点可以容忍挂掉2个节点。

客户端连接zookeeper时会选择任意一个节点保持长链接,后续通信都是通过这个节点进行读写的。如果该节点挂了,客户端会尝试去连接其它节点。

服务器会为每个客户端连接维持一个会话对象,会话的ID会保存在客户端。会话对象也是分布式的,意味着当一个节点挂掉了,客户端使用原有的会话ID去连接其它节点,服务器维持的会话对象还继续存在,并不需要重新创建一个新的会话。

如果客户端主动发送会话关闭消息,服务器的会话对象会立即删除。如果客户端不小心奔溃了,没有发送关闭消息,服务器的会话对象还会继续存在一段时间。这个时间是会话的过期时间,在创建会话的时候客户端会提供这个参数,一般是10到30秒。

也许你会问连接断开了,服务器是可以感知到的,为什么需要客户端主动发送关闭消息呢?

因为服务器要考虑网络抖动的情况,连接可能只是临时断开了。为了避免这种情况下反复创建和销毁复杂的会话对象以及创建会话后要进行的一系列事件初始化操作,服务器会尽量延长会话的生存时间。

zookeeper的节点可以是持久化(Persistent)的,也可以是临时(Ephermeral)的。所谓临时的节点就是会话关闭后,会话期间创建的所有临时节点会立即消失。一般用于服务发现系统,将服务进程的生命期和zookeeper子节点的生命期绑定在一起,起到了实时监控服务进程的存活的效果。

zookeeper还提供了顺序节点。类似于mysql里面的auto_increment属性。服务器会在顺序节点名称后自动增加自增的唯一后缀,保持节点名称的唯一性和顺序性。

还有一种节点叫着保护(Protected)节点。这个节点非常特殊,但是也非常常用。在应用服务发现的场合时,客户端创建了一个临时节点后,服务器节点挂了,连接断开了,然后客户端去重连到其它的节点。因为会话没有关闭,之前创建的临时节点还存在,但是这个时候客户端却无法识别去这个临时节点是不是自己创建的,因为节点内部并不存储会话ID字段。所以客户端会在节点名称上加上一个GUID前缀,这个前缀会保存在客户端,这样它就可以在重连后识别出哪个临时节点是自己之前创建的了。

接下来我们使用Go语言实现一下服务发现的注册和发现功能。

如图所示,我们要提供api.user这样的服务,这个服务有3个节点,每个节点有不一样的服务地址,这3个节点各自将自己的服务注册进zk,然后消费者进行读取zk得到api.user的服务地址,任选一个节点地址进行服务调用。为了简单化,这里就没有提供权重参数了。在一个正式的服务发现里一般都有权重参数,用于调整服务节点之间的流量分配。

go get github.com/samuel/go-zookeeper/zk

首先我们定义一个ServiceNode结构,这个结构数据会存储在节点的data中,表示服务发现的地址信息。

type ServiceNode struct {
	Name string `json:"name"` // 服务名称,这里是user
	Host string `json:"host"`
	Port int    `json:"port"`
}

在定义一个服务发现的客户端结构体SdClient。

type SdClient struct {
	zkServers []string // 多个节点地址
	zkRoot    string // 服务根节点,这里是/api
	conn      *zk.Conn // zk的客户端连接
}

编写构造器,创建根节点

func NewClient(zkServers []string, zkRoot string, timeout int) (*SdClient, error) {
	client := new(SdClient)
	client.zkServers = zkServers
	client.zkRoot = zkRoot
	// 连接服务器
	conn, _, err := zk.Connect(zkServers, time.Duration(timeout)*time.Second)
	if err != nil {
		return nil, err
	}
	client.conn = conn
	// 创建服务根节点
	if err := client.ensureRoot(); err != nil {
		client.Close()
		return nil, err
	}
	return client, nil}// 关闭连接,释放临时节点func (s *SdClient) Close() {
	s.conn.Close()
}

func (s *SdClient) ensureRoot() error {
	exists, _, err := s.conn.Exists(s.zkRoot)
	if err != nil {
		return err
	}
	if !exists {
		_, err := s.conn.Create(s.zkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}
	}
	return nil
}

值得注意的是代码中的Create调用可能会返回节点已存在错误,这是正常现象,因为会存在多进程同时创建节点的可能。如果创建根节点出错,还需要及时关闭连接。我们不关心节点的权限控制,所以使用zk.WorldACL(zk.PermAll)表示该节点没有权限限制。Create参数中的flag=0表示这是一个持久化的普通节点。

接下来我们编写服务注册方法

func (s *SdClient) Register(node *ServiceNode) error {
	if err := s.ensureName(node.Name); err != nil {
		return err
	}
	path := s.zkRoot + "/" + node.Name + "/n"
	data, err := json.Marshal(node)
	if err != nil {
		return err
	}
	_, err = s.conn.CreateProtectedEphemeralSequential(path, data, zk.WorldACL(zk.PermAll))
	if err != nil {
		return err
	}
	return nil}func (s *SdClient) ensureName(name string) error {
	path := s.zkRoot + "/" + name
	exists, _, err := s.conn.Exists(path)
	if err != nil {
		return err
	}
	if !exists {
		_, err := s.conn.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}
	}
	return nil
}

先要创建/api/user节点作为服务列表的父节点。然后创建一个保护顺序临时(ProtectedEphemeralSequential)子节点,同时将地址信息存储在节点中。什么叫保护顺序临时节点,首先它是一个临时节点,会话关闭后节点自动消失。其它它是个顺序节点,zookeeper自动在名称后面增加自增后缀,确保节点名称的唯一性。同时还是个保护性节点,节点前缀增加了GUID字段,确保断开重连后临时节点可以和客户端状态对接上。

接下来我们实现消费者获取服务列表方法

func (s *SdClient) GetNodes(name string) ([]*ServiceNode, error) {
	path := s.zkRoot + "/" + name
	// 获取字节点名称
	childs, _, err := s.conn.Children(path)
	if err != nil {
		if err == zk.ErrNoNode {
			return []*ServiceNode{}, nil
		}
		return nil, err
	}
	nodes := []*ServiceNode{}
	for _, child := range childs {
		fullPath := path + "/" + child
		data, _, err := s.conn.Get(fullPath)
		if err != nil {
			if err == zk.ErrNoNode {
				continue
			}
			return nil, err
		}
		node := new(ServiceNode)
		err = json.Unmarshal(data, node)
		if err != nil {
			return nil, err
		}
		nodes = append(nodes, node)
	}
	return nodes, nil
}

获取服务节点列表时,我们先获取字节点的名称列表,然后依次读取内容拿到服务地址。因为获取字节点名称和获取字节点内容不是一个原子操作,所以在调用Get获取内容时可能会出现节点不存在错误,这是正常现象。

将以上代码凑在一起,一个简单的服务发现包装就实现了。

最后我们看看如果使用以上代码,为了方便起见,我们将多个服务提供者和消费者写在一个main方法里。

func main() {
        // 服务器地址列表
	servers := []string{"192.168.0.101:2118", "192.168.0.102:2118", "192.168.0.103:2118"}
	client, err := NewClient(servers, "/api", 10)
	if err != nil {
		panic(err)
	}
	defer client.Close()
	node1 := &ServiceNode{"user", "127.0.0.1", 4000}
	node2 := &ServiceNode{"user", "127.0.0.1", 4001}
	node3 := &ServiceNode{"user", "127.0.0.1", 4002}
	if err := client.Register(node1); err != nil {
		panic(err)
	}
	if err := client.Register(node2); err != nil {
		panic(err)
	}
	if err := client.Register(node3); err != nil {
		panic(err)
	}
	nodes, err := client.GetNodes("user")
	if err != nil {
		panic(err)
	}
	for _, node := range nodes {
		fmt.Println(node.Host, node.Port)
	}
}

值得注意的是使用时一定要在进程退出前调用Close方法,否则zookeeper的会话不会立即关闭,服务器创建的临时节点也就不会立即消失,而是要等到timeout之后服务器才会清理。

原文发布于微信公众号 - 码洞(codehole)

原文发表时间:2018-03-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏C/C++基础

Linux下使用gdb调试core文件

当程序运行过程中出现Segmentation fault (core dumped)错误时,程序停止运行,并产生core文件。core文件是程序运行状态的内存映...

2063
来自专栏开源优测

性能测试必备监控技能jvm之jdk命令行工具篇16

前言 对于JVM的性能监控,主要注意以下关键参数,通过jdk自带的命令行工具,即可查看相关参数,从而分析系统或目标服务程序中存在的性能瓶颈 jps JVM Pr...

30012
来自专栏玩转JavaEE

Linux上安装Zookeeper以及一些注意事项

最近打算出一个系列,介绍Dubbo的使用。 ---- 分布式应用现在已经越来越广泛,Spring Could也是一个不错的一站式解决方案,不过据我了解国内目前貌...

4568

使用chmod修改文件权限

类Unix系统,包括在Linode平台上运行的Linux系统,具有非常强大的访问控制系统,允许系统管理员有效地配置多个用户的访问权限,而无需给予每个用户...

5323
来自专栏linux驱动个人学习

Linux下0号进程的前世(init_task进程)今生(idle进程)----Linux进程的管理与调度(五)【转】

Linux下有3个特殊的进程,idle进程(PID = 0), init进程(PID = 1)和kthreadd(PID = 2)

1842
来自专栏PHP实战技术

浏览器访问一个网站所经历的步骤

搜索操作系统自身的DNS缓存(浏览器没有找到缓存或缓存已经失效)

2299
来自专栏python3

python3--threading模块(线程)

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态...

3922
来自专栏Vamei实验室

C编译: makefile基础

在编译一个大型项目的时候,往往有很多目标文件、库文件、头文件以及最终的可执行文件。不同的文件之间存在依赖关系(dependency)。比如当我们使用下面命令编译...

24310
来自专栏蓝天

mooon调度器设计的考量因素

2、支持业务独占进程,这是保证高可用性的前提,也是解业务与业务间,和业务和平台间耦合的前提

842
来自专栏linux驱动个人学习

解析Linux中的VFS文件系统之文件系统的来源与简介(一)

最近挂载了N多的文件系统,大致了不同文件系统的相应特性及挂载方式,却还是对Linux的文件系统没有从源码方面去了解。不求甚解确实不好不好。 于是借鉴一些大牛的博...

4189

扫码关注云+社区

领取腾讯云代金券