1.服务器部署。

       无论服务器架构有多么简单或多么复杂,都需要使用某种方式在物理或虚拟机器上运行我们的Python代码,这一过程叫作 部署 。人们对部署的看法可以分两大类。

  • 为每个服务器程序都编写服务所提供的所有功能:通过两次fork()创建一个Unix守护进程或是将自己注册为一个Windows服务,安排进行系统级的日志操作,支持配置文件以及提供启动、关闭、和重启的相关机制。 ps:fork()函数通过系统调用创建一个与原来进程几乎完全相同的进程,也就是两个进程可以做完全相同的事,但如果初始参数或者传入的变量不同,两个进程也可以做不同的事。
  • 十二要素应用:该方法提倡只实现服务器程序必要功能的最小集合 。它将每个服务器实现为普通的前台程序,而不是将其实现为守护进程。这样的程序从环境变量而不是系统级的配置文件中获取所需的配置选项。

2.简单的TCP协议。

       下面的代码是模仿服务器和客户端的交互,客户端向服务端提出三个问题,服务器进行回答并将答案传给客户端。

#!/usr/bin/python
#coding:utf-8
import argparse, socket, time

#客户端希望服务器理解的三个问题和回答,以字典对应键值对的形式给出
aphorisms = {b'Beautiful is better than?': b'Ugly.',
             b'Explicit is better than?': b'Implicit.',
             b'Simple is better than?': b'Complex.'}

def get_answer(aphorism):
    time.sleep(0.0)  
    return aphorisms.get(aphorism, b'Error: unknown aphorism.')

def parse_command_line(description):
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('host', help='IP or hostname')
    parser.add_argument('-p', metavar='port', type=int, default=1060,
                        help='TCP port (default 1060)')
    args = parser.parse_args()
    address = (args.host, args.p)
    return address

#构造TCP监听套接字
def create_srv_socket(address):
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listener.bind(address)
    listener.listen(64)
    print('Listening at {}'.format(address))
    return listener

def accept_connections_forever(listener):
    while True:
        sock, address = listener.accept()
        print('Accepted connection from {}'.format(address))
        handle_conversation(sock, address)

def handle_conversation(sock, address):
    try:
        while True:
            handle_request(sock)
    # 捕捉异常除了EOFError其他错误都会被下一个except捕获并打印
    except EOFError:
        print('Client socket to {} has closed'.format(address))
    except Exception as e:
        print('Client {} error: {}'.format(address, e))
    # 确保无论该函数通过那一条代码路径退出都会执行关闭套接字选项
    finally:
        sock.close()

def handle_request(sock):
    aphorism = recv_until(sock, b'?')
    answer = get_answer(aphorism)
    sock.sendall(answer)

def recv_until(sock, suffix):
    message = sock.recv(4096)
    if not message:
        raise EOFError('socket closed')
    while not message.endswith(suffix):
        data = sock.recv(4096)
        if not data:
            raise IOError('received {!r} then socket closed'.format(message))
        message += data
    return message

if __name__ == '__main__':
    a = parse_command_line('TCP')
    b = create_srv_socket(a)
    accept_connections_forever(b)

       这段代码的重点在最后四个函数。

  • accept_connections_forever():函数中包含一个简单的循环,循环中不断通过监听套接字接受连接请求,并且使用print()把每个连接的客户端打印出来,然后将连接套接字作为参数传给下面的函数。
  • handle_conversation():此函数包含了一个无限的循环,来不断地处理请求。该程序会捕捉可能发生的错误,这使得客户端套接字的任何问题都不会引起程序崩溃。
  • handle_request(sock):函数能够简单的读取客户端的问题,然后做出应答。
  • recv_until():进行封帧操作。只要不断累加的字节字符串没有形成一个完整的问题,就会不断重复调用套接字的recv()方法。
           为了测试这个代码我们还要编写一个客户端:
#!/usr/bin/python
#coding:utf-8
import argparse,random,socket,zen_utils

def client(address,cause_error=False):
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.connect(address)
    aphorisms = list(zen_utils.aphorisms)
    if cause_error:
        sock.sendall(aphorisms[0][:-1])
        return
    for aphorism in random.sample(aphorisms,3):
        sock.sendall(aphorism)
        print(aphorism,zen_utils.recv_until(sock,b'.'))
    sock.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Example client')
    parser.add_argument('host',help='IP or hostname')
    parser.add_argument('-e',action='store_true',help='cause an error')
    parser.add_argument('-p',metavar='port',type=int,default=1060,
                        help='TCP port (default 1060)')
    args = parser.parse_args()
    address = (args.host,args.p)
    client(address,args.e)

       该客户端提供了一个-e参数用来模拟客户端发送不完整的问题,然后使服务器突然挂起。下面是测试结果:
在这里插入图片描述

3.单线程服务器。

       使用2中的示例代码可以快速编写一个单线程服务器。

#!/usr/bin/python
#coding:utf-8
import zen_utils

if __name__ == '__main__':
    address = zen_utils.parse_command_line('Simple single-threaded server')
    listener = zen_utils.create_srv_socket(address)
    zen_utils.accept_connections_forever(listener)

       如果与一个客户进行会话期间,另一个客户端也尝试连接这个服务器,那么就有可能造成这个服务器的死锁。由此出现了多线程,多进程服务器。

4.多线程与多进程服务器。

       为了解决服务器与多个客户端进行会话,于是诞生了利用操作系统的内置支持,使用多个控制线程单独运行同一段代码。可以创建多个共享内存空间的线程,也可以创建完全独立的进程。0下面是一个多线程服务器:

#!/usr/bin/python
#coding:utf-8
import zen_utils
from threading import Thread

def start_threads(listener,workers=4):
    t = (listener,)
    for i in range(workers):
        Thread(target=zen_utils.accept_connection_forever,args=t).start()

if __name__ == '__main__':
    address = zen_utils.parse_command_line('multi-threaded server')
    listener = zen_utils.create_srv_socket(address)
    start_threads(listener)

       这段代码中主线程启动n个服务器线程,然后退出。主线程认为这n个工作线程将永远运行,因此运行这些线程的进程也将会保持运行状态。

SocketServer框架

       这个框架是Python内置的实现操作系统级的控制线程来处理同一时刻的多个客户端会话。 它使用了大量面向对象以及多继承的设计思想。socketserver模块将上述多线程模式分为了两个模式:

  • 第一个用于打开监听套接字并接受客户端的server模式
  • 第二个是用于通过某个打开的套接字与特定客户端进行会话的handler模式

       下面是结合这两种模式的代码:

#!/usr/bin/python
#coding:utf-8

from socketserver import BaseRequestHandler,TCPServer,ThreadingMixIn
import zen_utils

class ZenHandler(BaseRequestHandler):
    def handle(self):
        zen_utils.handle_conversation(self.request,self.client_address)

class ZenServer(ThreadingMixIn,TCPServer):
    allow_reuse_address = 1

if __name__ == '__main__':
    address = zen_utils.parse_command_line('legacy "SocketServer" server')
    server = ZenServer(address, ZenHandler)
    server.serve_forever()

       我们需要实例化一个server对象,然后将一个handler对象作为参数传给server对象。可以将ThreadingMixIN改为ForkingMixIn,这样就可以使用完全隔离线程来处理连接的客户端,而不使用进程。

5.异步服务器。

       异步方式使代码不需要等待数据发送至某个特定的客户端或由这个客户端接受。相反,代码可以从整个处于等待的客户端套接字列表中读取数据。只要任何一个客户端做好了进行通信的准备,服务器就可以向该客户端发送响应。 它可以在所有连接的客户端之间自由切换,并提供相应的服务 。下面是一个简单的异步服务器:

#!/usr/bin/python
#coding:utf-8
import select, zen_utils

def all_events_forever(poll_object):
    while True:
        for fd, event in poll_object.poll():
            yield fd, event

def serve(listener):
    sockets = {listener.fileno(): listener}
    addresses = {}
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    poll_object.register(listener, select.POLLIN)

    for fd, event in all_events_forever(poll_object):
        sock = sockets[fd]

        if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock, b'')
            sb = bytes_to_send.pop(sock, b'')
            if rb:
                print('Client {} sent {} but then closed'.format(address, rb))
            elif sb:
                print('Client {} closed before we sent {}'.format(address, sb))
            else:
                print('Client {} closed socket normally'.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        elif sock is listener:
            sock, address = sock.accept()
            print('Accepted connection from {}'.format(address))
            sock.setblocking(False)     
            sockets[sock.fileno()] = sock
            addresses[sock] = address
            poll_object.register(sock, select.POLLIN)

        
        elif event & select.POLLIN:
            more_data = sock.recv(4096)
            if not more_data:  
                sock.close()  
                continue
            data = bytes_received.pop(sock, b'') + more_data
            if data.endswith(b'?'):
                bytes_to_send[sock] = zen_utils.get_answer(data)
                poll_object.modify(sock, select.POLLOUT)
            else:
                bytes_received[sock] = data

        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock, select.POLLIN)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('low-level async server')
    listener = zen_utils.create_srv_socket(address)
    serve(listener)

       Windows操作系统好像不支持异步操作,所以为了测试这个代码你可能需要在Linux操作系统上进行。这个服务器实际上有两层循环。首先是一个不断调用poll()的while循环。一次poll()调用可能返回多个事件,因此这个while()循环内部还有一个循环,用于处理poll()返回的每一个事件,下面是poll()函数一些参数的意义。
在这里插入图片描述

回调风格的asyncio

       下面的代码使用读取问题,然后做出响应的流程。asyncio维护了一个select风格的核心循环,将所有进行 I/O 操作的套接字保存在了一个表中,有需要时会在select循环里向表中添加或删除套接字。一旦套接字关闭,asyncio就会将其清除或丢弃。最后,当接收到实际数据时,将由用户代码来决定要返回的正确响应。下面的代码使用了asyncio框架:

#!/usr/bin/python
#coding:utf-8

import asyncio,zen_utils

class ZenServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            answer = zen_utils.get_answer(self.data)
            self.transport.write(answer)
            self.data = b''

    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address,exc) )
        elif self.data:
            print('Client {} sent {} but then closed'.format(self.address,self.data))
        else:
            print('Client {} closed socket'.format(self.address))

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using callbacks')
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ZenServer,*address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

       代码将真正的套接字隐藏了起来,通过该框架来获取远程地址,而不是通过套接字。数据是通过一个方法调用来传输的。该方法只需要将接收到的字符串作为参数即可。回答数据被传给了框架的transport.write()方法,并没有在主事件循环中。

协程风格的asyncio

       协程是一个函数,它在进行I/O操作时不会阻塞,而是会暂停,并将控制权转移回调用方。Python语言支持协程的一种标准形式就是生成器——在内部包含一个或多个yiled语句的函数。这类函数不会在运行一条返回语句后就退出,而是会返回一个序列。下面是一个协程风格的服务器:

#!/usr/bin/python
#coding:utf-8
import asyncio,zen_utils

@asyncio.coroutine
def handle_conversation(reader,writer):
    address = writer.get_extra_info('peername')
    print('Accepted connection from {}'.format(address))
    while True:
        data = b''
        while not data.endswith(b'?'):
            more_data = yield from reader.read(4096)
            if not more_data:
                if data:
                    print('Client {} sent {!r} but then closed'.format(address,data))
                else:
                    print('Client {} closed socket normally'.format(address))
                return
            data += more_data
        answer = zen_utils.get_answer(data)
        writer.write(answer)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using coroutine')
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_conversation,*address)
    server = loop.run_until_complete(coro)
    print('Linstening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

       此代码的while循环采用了之前的封帧方式,不断调用recv()然后将响应写入并发送给等待的客户端。所有操作都封装在while()循环之中。yield from代替了之前所有进行阻塞操作并等待操作系统响应的地方,这样使得系统不会造成阻塞,也不会同时处理多个客户端连接。

PS:学习这些内容学习一定的操作系统知识,实例代码都需要结合第一个代码使用,测试时请将他们放在同一个文件夹下。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐