Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

使用Kazoo操作ZooKeeper服务治理

Harvard_Fly 2019-01-30 23:09:00 阅读数:249 评论数:0 点赞数:0 收藏数:0

单机服务的可靠性及可扩展性有限,某台服务宕机可能会影响整个系统的正常使用;分布式服务能够有效地解决这一问题,但同时分布式服务也会带来一些新的问题,如:服务发现(新增或者删除了服务如何确保能让客户端知道),容灾(某些服务出现故障如何让客户端只访问正常的服务);ZooKeeper的提出主要是为了解决分布式服务的治理问题,它在分布式环境中协调和管理服务。

Zookeeper协调管理服务的过程如下图:

 

服务端:每台服务器都要向注册中心Zookeeper进行注册登记,并且保持与Zookeeper的连接,如果服务器与Zookeeper断开了连接,Zookeeper将删除该服务器的地址。
客户端:需要服务的时候先向Zookeeper订阅服务器的地址信息,Zookeeper返回给客户端已注册的服务器信息列表,客户端从服务器信息列表中选择服务器进行服务调用,如果Zookeeper记录的服务器信息发生了变更,服务器会通知客户端变更事件,客户端可以获取最新的服务器信息。
ZooKeeper文件系统的数据结构是个树状结构,它的每个节点(znode)由一个名称标识,并用路径/分割:

 

 ZooKeeper的节点类型有:

  1. 持久节点(ZooKeeper默认的节点类型,创建该节点的客户端断开连接后,持久节点仍然存在)

  2. 顺序节点(将10位的序列号附加到原始名称来设置节点的路径,如:/server0000000001)

  3. 临时节点(当客户端与ZooKeeper断开连接时,临时节点会自动删除)

 

RPC服务注册到ZooKeeper

服务端:

 1 import threading
 2 import json
 3 import socket
 4 import sys
 5 from kazoo.client import KazooClient
 6 from divide_rpc import ServerStub
 7 from divide_rpc import InvalidOperation
 8
 9
10 class ThreadServer(object):
11 def __init__(self, host, port, handlers):
12 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
14 self.host = host
15 self.port = port
16  self.sock.bind((host, port))
17 self.handlers = handlers
18
19 def serve(self):
20 """
21  开始服务
22 """
23 self.sock.listen(128)
24  self.register_zk()
25 print("开始监听")
26 while True:
27 conn, addr = self.sock.accept()
28 print("建立链接%s" % str(addr))
29 t = threading.Thread(target=self.handle, args=(conn,))
30  t.start()
31
32 def handle(self, client):
33 stub = ServerStub(client, self.handlers)
34 try:
35 while True:
36  stub.process()
37 except EOFError:
38 print("客户端关闭连接")
39
40  client.close()
41
42 def register_zk(self):
43 """
44  注册到zookeeper
45 """
46 self.zk = KazooClient(hosts='127.0.0.1:2181')
47  self.zk.start()
48 self.zk.ensure_path('/rpc') # 创建根节点
49 value = json.dumps({'host': self.host, 'port': self.port})
50 # 创建服务子节点
51 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
52
53
54 class Handlers:
55  @staticmethod
56 def divide(num1, num2=1):
57 """
58  除法
59  :param num1:
60  :param num2:
61  :return:
62 """
63 if num2 == 0:
64 raise InvalidOperation()
65 val = num1 / num2
66 return val
67
68
69 if __name__ == '__main__':
70 if len(sys.argv) < 3:
71 print("usage:python server.py [host] [port]")
72 exit(1)
73 host = sys.argv[1]
74 port = sys.argv[2]
75 server = ThreadServer(host, int(port), Handlers)
76 server.serve()

服务端通过kazoo连接zookeeper,依次创建根节点和服务的子节点,当启动多线程服务器的时候,会根据ip和端口创建不同的节点,依次启动两个server(8001、8002),查看zookeeper的节点信息:

1 >>> from kazoo.client import KazooClient
2 >>> zk = KazooClient(hosts='127.0.0.1:2181')
3 >>> zk.start()
4 >>> children = zk.get_children("/rpc")
5 >>> print(children)
6 ['server0000000001', 'server0000000000']

 

客户端:

 1 import random
 2 import time
 3 import json
 4 import socket
 5 from divide_rpc import (
 6  ClientStub, InvalidOperation
 7 )
 8 from kazoo.client import KazooClient
 9
10
11 class DistributedChannel(object):
12 def __init__(self):
13 self._zk = KazooClient(hosts='127.0.0.1:2181')
14  self._zk.start()
15  self._get_servers()
16
17 def _get_servers(self, event=None):
18 """
19  从zookeeper获取服务器地址信息列表
20 """
21 servers = self._zk.get_children('/rpc', watch=self._get_servers)
22 print(servers)
23 self._servers = []
24 for server in servers:
25 data = self._zk.get('/rpc/' + server)[0]
26 if data:
27 addr = json.loads(data.decode())
28  self._servers.append(addr)
29
30 def _get_server(self):
31 """
32  随机选出一个可用的服务器
33 """
34 return random.choice(self._servers)
35
36 def get_connection(self):
37 """
38  提供一个可用的tcp连接
39 """
40 while True:
41 server = self._get_server()
42 print(server)
43 try:
44 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45 sock.connect((server['host'], server['port']))
46 except ConnectionRefusedError:
47 time.sleep(1)
48 continue
49 else:
50 break
51 return sock
52
53
54 channel = DistributedChannel()
55
56 for i in range(50):
57 try:
58 stub = ClientStub(channel)
59 val = stub.divide(i)
60 except InvalidOperation as e:
61 print(e.message)
62 else:
63 print(val)
64 time.sleep(1)

 

客户端连接zookeeper,通过get_children来获取服务器信息,并watch监听服务器的变化情况,启动客户端会发现它会调用8001端口的server和8002端口的server:

此时服务端新增加一个结点,8003,客户端变化情况:

可以看出zookeeper总共有三个节点了,前面调用的server都是8001和8002,当8003加入后,zookeeper会发现并调用它

此时服务端断开一个server,8001,客户端变化情况:

断开server前客户端会调用8001、8002、8003这三个服务,当断开server 8001以后,zookeeper只会调用8002和8003这两个server了

版权声明
本文为[Harvard_Fly]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/FG123/p/10261682.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领