访问官网

进程介绍:

每启动一个进程都是占用独立的内存空间的;每个进程内的数据是独立的;

每台计算机可以启动多个进程,进程数量尽量和核数一致;

每个进程内可以启动多个线程;每个线程内可以启动多个协程(异步)。

"""简单入门"""
import time
import os
from multiprocessing import Process


def demo_one():
    print("start")
    time.sleep(10)
    print(os.getpid(), os.getppid())  # 子进程和父进程的pid
    print("end")


if __name__ == '__main__':
    p = Process(target=demo_one)  # 创建一个进程,此进程执行demo_one()函数
    p.start()  # 开始执行进程
    print("mian")
"""带参数的进程,多个继承"""
import time
import os
from multiprocessing import Process


def demo_one(num):
    print("start" + str(num))
    time.sleep(1)
    print(os.getpid(), os.getppid(), num)  # 子进程和父进程的pid
    print("end" + str(num))


if __name__ == '__main__':
    p_list = []
    for i in range(5):
        p = Process(target=demo_one, args=(i, ))  # 进程执行带参数的函数
        p.start()
        p_list.append(p)
    # for循环内添加join()  线程会按照顺序执行
    for p in p_list:
        p.join()
    print("mian")
"""守护进程,设置守护进程后,在主进程结束时,子进程无论是否执行完毕都会停止"""
from multiprocessing import Process
import time


def demo_one():
    print("start")
    time.sleep(3)
    print("end")


if __name__ == '__main__':
    p = Process(target=demo_one)
    p.daemon = True  # 在start之前设置守护进程,子进程会在主进程结束后直接结束,无论子进程是否执行完
    p.start()
    print("main")
"""自定义进程子类, 必须重写run()方法,在调用子类对象的start()方法时会执行重写的run()方法"""
from multiprocessing import Process, current_process
import time


class MyProceee(Process):
    def __init__(self):
        super().__init__()

    def run(self):
        print("必须重写进程类的run方法")
        print("调用start方法时,会执行这个重写的run方法")
        print(current_process().pid, current_process())  # 当前进程的pid
        time.sleep(1)


if __name__ == '__main__':
    p_list = []
    for i in range(20):
        p = MyProceee()
        p.start()
        p_list.append(p)
        if p.is_alive():  # 进程是否存活
            p.terminate()  # 结束进程
            time.sleep(0.5)
            print(p.is_alive())
    for p in p_list:
        p.join()
"""
进程锁
每一个进程都有自己独立的空间,多进程操作同一块数据时,必须加锁处理,否则会导致数据错误
加锁会把程序变成串行,效率变低
"""
from multiprocessing import Process, Lock
import time
import os


def demo_one():
    """查票"""
    with open("data.txt", "r") as f:
        num = int(f.read())
    return num


def demo_two(lock):
    """购票"""
    n = demo_one()
    print("%s end,结束查询余票:%d\n" % (str(os.getpid()), n))
    lock.acquire()
    if n:
        n -= 1
        with open("data.txt", "w") as f:
            n = 0
            f.write(str(n))
        print("%s 购票成功\n" % os.getpid())
    else:
        print("%s 失败\n" % os.getpid())
    lock.release()


def buy(lock, start):
    demo_two(lock)
    print("end time %d!!!!!!!!!!!!!!!" % (time.time() - start))


if __name__ == '__main__':
    start = time.time()
    p_list = []
    lock = Lock()
    for i in range(30):
        p = Process(target=buy, args=(lock, start))
        p.start()
        p_list.append(p)
    print("main----------------------------")
"""进程池Pool.apply() 同步任务方法"""
import os
import time
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l = []
    for i in range(10):
        res = p.apply(work, args=(i,))  # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
        # 但不管该任务是否存在阻塞,同步调用都会在原地等着,同时只有一个进程执行
        res_l.append(res)
    print(res_l)
"""使用线程池的Pool.apply_async异步任务方法"""
import os
import time
import random
from multiprocessing import Pool


def work(n):
    print('%s run' % os.getpid())
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = Pool(3)  # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))  # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
        # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
        # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
        # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
        res_l.append(res)

    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
    # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()
    p.join()
    # 在进程池中所有任务执行完毕后,再处理结果
    for res in res_l:
        print("执行结果输出:", res.get())  # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
"""进程池 回调函数 回调函数会把任务函数的结果作为参数执行"""
from multiprocessing import Pool
import time
import os


def work():
    time.sleep(1)
    print("%d 进程 work" % os.getpid())
    return time.time()


def call(res):
    # time.sleep(1)
    print("处理回调结果:{}".format(str(res)))


if __name__ == '__main__':
    pool = Pool(2)
    for i in range(5):
        # 在进程池中的每个任务执行完毕时,执行回调函数
        # 使用callback回调函数时,会把工作任务func方法的返回结果传给callback方法进行处理
        res = pool.apply_async(func=work, callback=call)
    pool.close()  # 必须等进程池关闭后,才能使用join()方法
    pool.join()  # 主进程等待子进程结束
    print("main end")

"""消息队列 Queue"""
from multiprocessing import Process, Queue

max = 3
q = Queue(maxsize=max)
print(q.empty())  # 队列是否为空
print(q.full())  # 队列书否已满
peple = ["a", 'b', 'c', 'd', 'e', 'f', 'g']
for i in peple:
    if q.full():
        print("队列已满")
        data = q.get()  # 取走一个数据。先进先出原则,总是取最早进入队列的数据
        print("取走了一个 %s, 还有%d 个" % (data, q.qsize()))
    else:
        q.put(i)  # 如果队列满了,会阻塞,一直停在这,直到有数据被取走,再继续往队列尾端加数据
        # q.put_nowait(i)  # 如果队列满了,不会阻塞,会报错
        print("加入一个")
        print("当前队列数量:", q.qsize())
    # 如果队列满了,基础往里面添加数据的话,会阻塞程序。等待数据被取走后,再将数据放入队列
"""多进程使用消息队列 阻塞方式"""
from multiprocessing import Process, Queue
import time
import os


def read_file(file_name):
    """从文件内读取数据"""
    with open(file_name, "r") as f:
        num = int(f.read())
    return num


def write_file(file_name, data):
    """将数据写入文件"""
    with open(file_name, "w") as f:
        f.write(str(data))


def inputQ(queue):
    """加入队列"""
    queue.put(str(os.getpid()) + " -- " + str(time.time()))  # 当队列已满时,会阻塞,等待
    print("已生产 %d  ,添加到队列" % os.getpid())


def outputQ(queue):
    """从队列内取数据"""
    data = queue.get()  # 当队列内无数据时,会阻塞等待
    if data is None:
        return
    else:
        print(str(os.getpid()) + "已消费")
        return 0


def add_queue(q):
    for i in range(10):
       inputQ(q)
    # 队列信息结束后,添加一个指定字段;消费的时候判断获取到此字段时停止阻塞
    q.put(None)  # 也可以在主进程内,在生产者结束(join())后发


def delete_queue(q):
    # 抽奖用户数比奖品数量多, 这种情况如果使用get()会阻塞
    while True:
        num = outputQ(q)
        # 如果获取到队列内指定的结束数据时,停止阻塞,跳出循环
        if num is None:
            break


if __name__ == '__main__':
    file_name = "data.txt"
    record_in = []
    record_out = []
    # q = Queue(2)  # 设置队列长度
    q = Queue()  # 默认队列无限长
    p1 = Process(target=add_queue, args=(q,))
    p2 = Process(target=delete_queue, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    # q.put(None)
"""多进程使用消息队列 非阻塞方式 将任务放入/取出队列时不等待,但队列已满/为空时,会报错,所以要使用try: finnaly"""
import multiprocessing
from multiprocessing import Process, Queue
import time
import os


def inputQ(queue):
    """加入队列"""
    # queue.put(str(os.getpid()) + " -- " + str(time.time()))  # 当队列已满时,会阻塞,等待
    try:
        queue.put_nowait(str(os.getpid()) + " -- " + str(time.time()))
        print("用户 %d 进来要抽奖了 ,添加到队列" % os.getpid())
    except Exception as e:
        print("队列已满, %d" % os.getpid(), e)


def outputQ(queue):
    """从队列内取出数据"""
    try:
        data = queue.get_nowait()  # 当队列内无数据时,不会阻塞,会报错
    except Exception as e:
        print("队列已空", e)
    with open("data.txt", "r") as f:
        num = int(f.read())
    if num > 0:
        num_end = num
        num_end -= 1
        with open("data.txt", "w") as f:
            f.write(str(num_end))
        print(str(os.getpid()) + "用户 '%s' 已抽奖,抽奖前库存:%d,抽奖后库存:%d" %(data, num, num_end))
    else:
        print(str(os.getpid()) + "无库存了")


def run_test(q):
    """模拟有10个线程往队列内插入数据,有100个线程从队列内取数据"""
    for i in range(10):
        p = Process(target=inputQ, args=(q,))
        p.start()
        record_in.append(p)
    for i in range(100):  # 抽奖用户数比奖品数量多, 这种情况如果使用get_nowait()不会阻塞
        p = Process(target=outputQ, args=(q,))
        p.start()
        record_out.append(p)


if __name__ == '__main__':
    multiprocessing.freeze_support()
    record_in = []
    record_out = []
    q = Queue(2)  # 设置队列长度
    run_test(q)
"""
JoinableQueue 消息类
必须在消息队列内的任务被消费(get)时,使用 task_down() 方法 发送被消费信号
"""
from multiprocessing import Process, JoinableQueue
import time
import os


def inputQ(queue, name):
    """加入队列"""
    queue.put(name + str(os.getpid()) + " -- " + str(time.time()))  # 当队列已满时,会阻塞,等待
    print(name + "已生产 %d  ,添加到队列, " % os.getpid())


def outputQ(queue):
    """从队列内取数据"""
    data = queue.get()  # 当队列内无数据时,会阻塞等待
    queue.task_done()  # 向q.join()发送一个数据被消费的信号


def add_queue(q, name):
    for i in range(10):
       inputQ(q, name)
    q.join()  # 生产完毕后,此方法会阻塞,直到队列内数据为空
    print("已全部消费完了---------" + str(os.getpid()))


def delete_queue(q):
    # 抽奖用户数比奖品数量多, 这种情况如果使用get()会阻塞
    while True:
        outputQ(q)
        print(str(os.getpid()) + "已消费")


if __name__ == '__main__':
    file_name = "data.txt"
    record_in = []
    record_out = []
    q = JoinableQueue()  # 默认队列无限长
    p1 = Process(target=add_queue, args=(q, '猪蹄子'))
    p2 = Process(target=add_queue, args=(q, '涮羊肉'))
    p3 = Process(target=add_queue, args=(q, '羊蝎子'))
    c1 = Process(target=delete_queue, args=(q,))
    c2 = Process(target=delete_queue, args=(q,))
    # 设置消费进程为守护进程,消费完毕后跟随主进程结束而结束
    c1.daemon = True
    c2.daemon = True
    # 启动线程
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待生产结束
    p1.join()
    p2.join()
    p3.join()
"""
使用Manager进程管理器 进行进程间的数据共享
支持类型 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array
"""
from multiprocessing import Process, Manager, Lock


def work(loc, data):
    with loc:
        data['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    with Manager() as manager:
        dic = manager.dict({"count": 100})
        lis = []
        for i in range(100):
            p = Process(target=work, args=(lock, dic))
            p.start()
            lis.append(p)
        for p in lis:
            p.join()
        print(dic)  # {'count': 0}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐