体系结构

Zookeeper集群配置

server.0=192.168.0.1:2888:3888;2181
server.1=192.168.0.2:2888:3888;2181
server.2=192.168.0.3:2888:3888;2181
dataDir=/data
datalogDir=/datalog

事务日志与快照

防止在Zookeeper集群挂掉时丢失所有注册信息,在恢复时不需要重新注册

需要配置dataDir和datalogDir的路径

参考资料:ZooKeeper的日志和快照 

挂载目录

$zookeeper/conf/zoo.cfg:/conf/zoo.cfg
$zookeeper/data:/data
$zookeeper/datalog:/datalog

注:(/data/myid为0,1,2) 

服务注册模块

1、将服务的ip和端口写进zookeeper中

2、对节点进行监视,当节点被删除或者失去连接时重新注册,并重新监视

3、使用acl机制对节点设置访问权限,使用给定的用户名密码登录

4、ip地址可以根据socket协议自动获取

register.yaml

zookeeper:
  hosts:
    -
      host: 192.168.0.1
      port: 2181
    -
      host: 192.168.0.2
      port: 2181
    -
      host: 192.168.0.3
      port: 2181
  acl_user: user0
  acl_pass: pass0
m:
  model_name: model1
  acl_user: user1
  acl_pass: pass1
  auto_ip: true
  model_ip: localhost
  model_port: 50000

register.py

#encoding=utf8
import socket
import yaml
import time
from easydict import EasyDict as edict
import json
from kazoo import security
from kazoo.client import KazooClient
from kazoo.client import EventType
from kazoo.exceptions import SessionExpiredError, ConnectionLossException, NoAuthError
from functools import wraps
from flask import g, Flask, Response, request

def load_config(path):
    with open(path, encoding='UTF-8') as f:
        cf = edict(yaml.load(f))
    return cf

class ExceptionHandler(object):
    def __init__(self):
        #KazooTimeoutError
        pass

    def __call__(self, func):
        @wraps(func)
        def wrapped_function(*args, **kwargs):
            is_success = False
            while not is_success:
                try:
                    is_success = func(*args, **kwargs)
                except (ConnectionLossException, SessionExpiredError) as e:
                    is_success = False
                except NoAuthError as e:
                    raise e
                except Exception as e:
                    is_success = False
                    time.sleep(1)
        return wrapped_function

class ZKModelWatcher(object):

    def __init__(self, hosts):
        hosts_arr = hosts.split(',')
        self._a_host = hosts_arr[0].split(':')[0]
        self._a_port = int(hosts_arr[0].split(':')[1])
        self._hosts = hosts #zookeeper集群连接地址
        self._zkc = KazooClient(hosts=self._hosts)

    def set_acl(self):
        self.auth_data = [("digest", f"{config.m.acl_user}:{config.m.acl_pass}"),
                          ("digest", f"{config.zookeeper.acl_user}:{config.zookeeper.acl_pass}")]
        self.acl = security.make_digest_acl(username=config.m.acl_user,
                                            password=config.m.acl_pass, read=True, create=True, write=True)
        self._zkc.auth_data = self.auth_data
        self._zkc.default_acl = [self.acl]
        return self

    def set_model(self):
        self.model_name = config.m.model_name  # 注册模型服务名称,用于zookeeper根节点名称
        self.model_ip = config.m.model_ip
        self.auto_ip = config.m.auto_ip
        self.model_port = config.m.model_port  # 注册模型服务端口号
        return self

    def start(self):
        self._zkc.start()
        return self
    
    def get_zk_state(self):
        return self._zkc.state

    def get_host_ip(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect((self._a_host, self._a_port))
            ip = s.getsockname()[0]
        finally:
            s.close()
        return ip

    def create_model_node(self):
        #创建临时、序列化节点。
        ip = self.get_host_ip() if self.auto_ip else self.model_ip
        content = {'ip': ip, 'port': self.model_port}
        res = self._zkc.create(f'/{self.model_name}/model', bytes(json.dumps(content), encoding='utf8')
                        ,makepath=True, sequence=True, ephemeral=True)
        return res
    
    def update_config(self, hosts):
        self._hosts = hosts
        hosts_arr = hosts.split(',')
        self._a_host = hosts_arr[0].split(':')[0]
        self._a_port = int(hosts_arr[0].split(':')[1])
        self._zkc.set_hosts(hosts)
        self.set_model()
        self.set_acl()
    
    def restart_zk(self):
        try: self._zkc.stop()
        except: pass
        self._zkc = KazooClient(hosts=self._hosts, default_acl=[self.acl], auth_data=self.auth_data)
        self._zkc.start()

    @ExceptionHandler()
    def register(self, data, stat, event):
        if event and event.type == EventType.NONE:
            self.restart_zk()
            res = self.create_model_node()
            self._zkc.DataWatch(path=f'{res}', func=self.register)
            if not self._zkc.exists(path=f'{res}'):
                return False
        elif event and event.type == EventType.DELETED:
            res = self.create_model_node()
            self._zkc.DataWatch(path=f'{res}', func=self.register)
            if not self._zkc.exists(path=f'{res}'):
                return False
        return True

    @ExceptionHandler()
    def run(self):
        self.restart_zk()
        res = self.create_model_node()
        self._zkc.DataWatch(path=f'{res}', func=self.register)
        if not self._zkc.exists(path=f'{res}'):
            return False
        return True

    def close(self):
        try:
            self._zkc.stop()
            self._zkc.close()
        except Exception as e:
            print(str(e))

config = load_config('register.yaml')

zw = ZKModelWatcher(','.join([f"{host['host']}:{host['port']}" for host in config.zookeeper.hosts])).set_acl().set_model().start()

app = Flask(__name__)
@app.route('/')
def test_con():
    response = Response()
    response.response = zw.get_zk_state()
    return response

@app.route('/refresh', methods=['GET'])
def query():
    response = Response()
    global config
    config = load_config('register.yaml')
    zw.update_config(','.join([f"{host['host']}:{host['port']}" for host in config.zookeeper.hosts]))
    response.response = json.dumps({'status': 'ok'}, ensure_ascii=False)
    return response

if __name__ == '__main__':
    zw.run()
    app.run(host='0.0.0.0', port=50000, debug=False)
    zw.close()

 注:auto_ip根据网络环境进行设置

服务发现模块

1、服务开始时将所有所需要的服务地址写进缓存中

2、对缓存中的每个服务地址进行监控,发生变化时及时更新,并重新监视

3、访问时直接从缓存中访问,按照一定的策略指定其中的一个服务

client.yaml

zookeeper:
  hosts:
    -
      host: 192.168.0.1
      port: 2181
    -
      host: 192.168.0.2
      port: 2181
    -
      host: 192.168.0.3
      port: 2181
  acl_user: user0
  acl_pass: pass0
models:
  model1:
    model_name: model1
    acl_user: user1
    acl_pass: pass1

client.py

#encoding=utf8
from kazoo.client import KazooClient
from kazoo.client import EventType
from kazoo import security
from datetime import datetime
from flask import g, Flask, Response, request
from easydict import EasyDict as edict
import yaml
import json
import random
import requests
import copy
settings = edict()

def load_config(path):
    with open(path, encoding='UTF-8') as f:
        cf = edict(yaml.load(f))
    return cf

class ZooService():

    def __init__(self, models):
        self.models = models

    def service_update(self, data, stat, event):
        if event and event.type == EventType.NONE:
            self.start()
        elif event and event.type == EventType.CHANGED:
            m = settings.models
            self.zk.DataWatch(path=event.path, func=self.service_update)
            paths = event.path.split('/')
            service_host = self.zk.get(event.path)
            m[paths[1]][paths[2]] = {'info': json.loads(service_host[0].decode()), 'update_time': str(datetime.now())}
        elif event and event.type == EventType.DELETED:
            m = settings.models
            paths = event.path.split('/')
            if paths[1] in m and paths[2] in m[paths[1]]:
                del settings.models[paths[1]][paths[2]]

    def service_addchild(self, event):
        if event and event.type == EventType.CHILD:
            paths = event.path.split('/')
            if len(paths) == 2:
                services = self.zk.get_children(event.path, watch=self.service_addchild)
                for service in services:
                    service_host = self.zk.get(f'{event.path}/{service}')
                    self.zk.DataWatch(path=f'{event.path}/{service}', func=self.service_update)
                    settings.models[paths[1]][service] = {'info': json.loads(service_host[0].decode()),
                                                   'update_time': str(datetime.now())}

    def init_zk(self):
        config = settings.config
        zoo_hosts = ','.join([f"{a_host.host}:{a_host.port}" for a_host in config.zookeeper.hosts])
        auth_data = [('digest', f'{config.models[model].acl_user}:{config.models[model].acl_pass}') for model in
                     self.models]
        auth_data.append(('digest', f'{config.zookeeper.acl_user}:{config.zookeeper.acl_pass}'))
        acl = security.make_digest_acl(username=f"{config.zookeeper.acl_user}",
                                       password=f"{config.zookeeper.acl_pass}", read=True, create=True)
        self.zk = zk = KazooClient(hosts=zoo_hosts, auth_data=auth_data)
        zk.start()
        try:
            zk.set_acls('/', acls=[acl])
            zk.set_acls('/zookeeper', acls=[acl])
            zk.set_acls('/zookeeper/quota', acls=[acl])
        except: pass
        return zk

    def start(self):
        config = settings.config
        settings.models = {}
        zk = self.init_zk()
        for model in self.models:
            if model not in settings.models: settings.models[model] = {}
            if zk.exists(f'/{model}') is None:
                acl = security.make_digest_acl(username=f"{config.models[model].acl_user}",
                                               password=f"{config.models[model].acl_pass}", read=True, create=True)
                zk.ensure_path(f'/{model}', acl=[acl])

            services = zk.get_children(f'/{model}', watch=self.service_addchild)
            for service in services:
                service_host = zk.get(f'/{model}/{service}')
                zk.DataWatch(path=f'/{model}/{service}', func=self.service_update)
                settings.models[model][service] = {'info': json.loads(service_host[0].decode()), 'update_time': str(datetime.now())}

settings.config = load_config('client.yaml')
ZooService(['model1']).start()
app = Flask(__name__)

@app.route('/request')
def request():
    m = copy.deepcopy(settings.models)
    model_keys = list(m['model1'].keys())
    response = Response()
    if len(model_keys) <= 0:
        response.response = 'no model'
        return response
    random.shuffle(model_keys)
    model = m['model1'][model_keys[0]]
    url = f"http://{model['info']['ip']}:{model['info']['port']}"
    res = requests.get(url, params={'query': 'test'})
    response.response = res.text
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8089, debug=False)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐