Zookeeper实现服务注册和发现功能
体系结构Zookeeper集群配置server.0=192.168.0.1:2888:3888;2181server.1=192.168.0.2:2888:3888;2181server.2=192.168.0.3:2888:3888;2181dataDir=/datadatalogDir=/datalog事务日志与快照防止在Zookeeper集群挂掉时丢失所有注册信息,在恢复时不需要重新注册需要
·
体系结构
Zookeeper集群配置
server.0=192.168.0.1:2888:3888;2181
server.1=192.168.0.2:2888:3888;2181
server.2=192.168.0.3:2888:3888;2181
dataDir=/data
datalogDir=/datalog
事务日志与快照
防止在Zookeeper集群挂掉时丢失所有注册信息,在恢复时不需要重新注册
需要配置dataDir和datalogDir的路径
参考资料:ZooKeeper的日志和快照
挂载目录
$zookeeper/conf/zoo.cfg:/conf/zoo.cfg
$zookeeper/data:/data
$zookeeper/datalog:/datalog
注:(/data/myid为0,1,2)
服务注册模块
1、将服务的ip和端口写进zookeeper中
2、对节点进行监视,当节点被删除或者失去连接时重新注册,并重新监视
3、使用acl机制对节点设置访问权限,使用给定的用户名密码登录
4、ip地址可以根据socket协议自动获取
register.yaml
zookeeper:
hosts:
-
host: 192.168.0.1
port: 2181
-
host: 192.168.0.2
port: 2181
-
host: 192.168.0.3
port: 2181
acl_user: user0
acl_pass: pass0
m:
model_name: model1
acl_user: user1
acl_pass: pass1
auto_ip: true
model_ip: localhost
model_port: 50000
register.py
#encoding=utf8
import socket
import yaml
import time
from easydict import EasyDict as edict
import json
from kazoo import security
from kazoo.client import KazooClient
from kazoo.client import EventType
from kazoo.exceptions import SessionExpiredError, ConnectionLossException, NoAuthError
from functools import wraps
from flask import g, Flask, Response, request
def load_config(path):
with open(path, encoding='UTF-8') as f:
cf = edict(yaml.load(f))
return cf
class ExceptionHandler(object):
def __init__(self):
#KazooTimeoutError
pass
def __call__(self, func):
@wraps(func)
def wrapped_function(*args, **kwargs):
is_success = False
while not is_success:
try:
is_success = func(*args, **kwargs)
except (ConnectionLossException, SessionExpiredError) as e:
is_success = False
except NoAuthError as e:
raise e
except Exception as e:
is_success = False
time.sleep(1)
return wrapped_function
class ZKModelWatcher(object):
def __init__(self, hosts):
hosts_arr = hosts.split(',')
self._a_host = hosts_arr[0].split(':')[0]
self._a_port = int(hosts_arr[0].split(':')[1])
self._hosts = hosts #zookeeper集群连接地址
self._zkc = KazooClient(hosts=self._hosts)
def set_acl(self):
self.auth_data = [("digest", f"{config.m.acl_user}:{config.m.acl_pass}"),
("digest", f"{config.zookeeper.acl_user}:{config.zookeeper.acl_pass}")]
self.acl = security.make_digest_acl(username=config.m.acl_user,
password=config.m.acl_pass, read=True, create=True, write=True)
self._zkc.auth_data = self.auth_data
self._zkc.default_acl = [self.acl]
return self
def set_model(self):
self.model_name = config.m.model_name # 注册模型服务名称,用于zookeeper根节点名称
self.model_ip = config.m.model_ip
self.auto_ip = config.m.auto_ip
self.model_port = config.m.model_port # 注册模型服务端口号
return self
def start(self):
self._zkc.start()
return self
def get_zk_state(self):
return self._zkc.state
def get_host_ip(self):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((self._a_host, self._a_port))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def create_model_node(self):
#创建临时、序列化节点。
ip = self.get_host_ip() if self.auto_ip else self.model_ip
content = {'ip': ip, 'port': self.model_port}
res = self._zkc.create(f'/{self.model_name}/model', bytes(json.dumps(content), encoding='utf8')
,makepath=True, sequence=True, ephemeral=True)
return res
def update_config(self, hosts):
self._hosts = hosts
hosts_arr = hosts.split(',')
self._a_host = hosts_arr[0].split(':')[0]
self._a_port = int(hosts_arr[0].split(':')[1])
self._zkc.set_hosts(hosts)
self.set_model()
self.set_acl()
def restart_zk(self):
try: self._zkc.stop()
except: pass
self._zkc = KazooClient(hosts=self._hosts, default_acl=[self.acl], auth_data=self.auth_data)
self._zkc.start()
@ExceptionHandler()
def register(self, data, stat, event):
if event and event.type == EventType.NONE:
self.restart_zk()
res = self.create_model_node()
self._zkc.DataWatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return False
elif event and event.type == EventType.DELETED:
res = self.create_model_node()
self._zkc.DataWatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return False
return True
@ExceptionHandler()
def run(self):
self.restart_zk()
res = self.create_model_node()
self._zkc.DataWatch(path=f'{res}', func=self.register)
if not self._zkc.exists(path=f'{res}'):
return False
return True
def close(self):
try:
self._zkc.stop()
self._zkc.close()
except Exception as e:
print(str(e))
config = load_config('register.yaml')
zw = ZKModelWatcher(','.join([f"{host['host']}:{host['port']}" for host in config.zookeeper.hosts])).set_acl().set_model().start()
app = Flask(__name__)
@app.route('/')
def test_con():
response = Response()
response.response = zw.get_zk_state()
return response
@app.route('/refresh', methods=['GET'])
def query():
response = Response()
global config
config = load_config('register.yaml')
zw.update_config(','.join([f"{host['host']}:{host['port']}" for host in config.zookeeper.hosts]))
response.response = json.dumps({'status': 'ok'}, ensure_ascii=False)
return response
if __name__ == '__main__':
zw.run()
app.run(host='0.0.0.0', port=50000, debug=False)
zw.close()
注:auto_ip根据网络环境进行设置
服务发现模块
1、服务开始时将所有所需要的服务地址写进缓存中
2、对缓存中的每个服务地址进行监控,发生变化时及时更新,并重新监视
3、访问时直接从缓存中访问,按照一定的策略指定其中的一个服务
client.yaml
zookeeper:
hosts:
-
host: 192.168.0.1
port: 2181
-
host: 192.168.0.2
port: 2181
-
host: 192.168.0.3
port: 2181
acl_user: user0
acl_pass: pass0
models:
model1:
model_name: model1
acl_user: user1
acl_pass: pass1
client.py
#encoding=utf8
from kazoo.client import KazooClient
from kazoo.client import EventType
from kazoo import security
from datetime import datetime
from flask import g, Flask, Response, request
from easydict import EasyDict as edict
import yaml
import json
import random
import requests
import copy
settings = edict()
def load_config(path):
with open(path, encoding='UTF-8') as f:
cf = edict(yaml.load(f))
return cf
class ZooService():
def __init__(self, models):
self.models = models
def service_update(self, data, stat, event):
if event and event.type == EventType.NONE:
self.start()
elif event and event.type == EventType.CHANGED:
m = settings.models
self.zk.DataWatch(path=event.path, func=self.service_update)
paths = event.path.split('/')
service_host = self.zk.get(event.path)
m[paths[1]][paths[2]] = {'info': json.loads(service_host[0].decode()), 'update_time': str(datetime.now())}
elif event and event.type == EventType.DELETED:
m = settings.models
paths = event.path.split('/')
if paths[1] in m and paths[2] in m[paths[1]]:
del settings.models[paths[1]][paths[2]]
def service_addchild(self, event):
if event and event.type == EventType.CHILD:
paths = event.path.split('/')
if len(paths) == 2:
services = self.zk.get_children(event.path, watch=self.service_addchild)
for service in services:
service_host = self.zk.get(f'{event.path}/{service}')
self.zk.DataWatch(path=f'{event.path}/{service}', func=self.service_update)
settings.models[paths[1]][service] = {'info': json.loads(service_host[0].decode()),
'update_time': str(datetime.now())}
def init_zk(self):
config = settings.config
zoo_hosts = ','.join([f"{a_host.host}:{a_host.port}" for a_host in config.zookeeper.hosts])
auth_data = [('digest', f'{config.models[model].acl_user}:{config.models[model].acl_pass}') for model in
self.models]
auth_data.append(('digest', f'{config.zookeeper.acl_user}:{config.zookeeper.acl_pass}'))
acl = security.make_digest_acl(username=f"{config.zookeeper.acl_user}",
password=f"{config.zookeeper.acl_pass}", read=True, create=True)
self.zk = zk = KazooClient(hosts=zoo_hosts, auth_data=auth_data)
zk.start()
try:
zk.set_acls('/', acls=[acl])
zk.set_acls('/zookeeper', acls=[acl])
zk.set_acls('/zookeeper/quota', acls=[acl])
except: pass
return zk
def start(self):
config = settings.config
settings.models = {}
zk = self.init_zk()
for model in self.models:
if model not in settings.models: settings.models[model] = {}
if zk.exists(f'/{model}') is None:
acl = security.make_digest_acl(username=f"{config.models[model].acl_user}",
password=f"{config.models[model].acl_pass}", read=True, create=True)
zk.ensure_path(f'/{model}', acl=[acl])
services = zk.get_children(f'/{model}', watch=self.service_addchild)
for service in services:
service_host = zk.get(f'/{model}/{service}')
zk.DataWatch(path=f'/{model}/{service}', func=self.service_update)
settings.models[model][service] = {'info': json.loads(service_host[0].decode()), 'update_time': str(datetime.now())}
settings.config = load_config('client.yaml')
ZooService(['model1']).start()
app = Flask(__name__)
@app.route('/request')
def request():
m = copy.deepcopy(settings.models)
model_keys = list(m['model1'].keys())
response = Response()
if len(model_keys) <= 0:
response.response = 'no model'
return response
random.shuffle(model_keys)
model = m['model1'][model_keys[0]]
url = f"http://{model['info']['ip']}:{model['info']['port']}"
res = requests.get(url, params={'query': 'test'})
response.response = res.text
return response
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8089, debug=False)
更多推荐
所有评论(0)