菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
116
0

IPC机制及生产者消费者模型

原创
05/13 14:22
阅读数 26420

IPC机制

1.简介

IPCInter-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。

 

2.实现进程间通信的2种方式

 

管道:pipe

 

队列:queue(其实就是pipe+lock)

 

注意:两者实际上都是内存空间,不要往里面放入大数据,只能放数据量较小的消息

 

3.IPC所解决的问题

1.当多个任务并发的去修改共享数据,就可能会造成数据错乱,我们通过加互斥锁使多个任务对共享数据的操作由并发变为“串行”,从而保证了共享数据的安全,而当出现需要修改多个共享数据的需求时,我们就得再次加锁处理

---->IPC帮我们解决了需要自己加锁的问题

2.进程间的内存空间是彼此隔离的,如何完成通信(数据交互),就需要寻求一种共享的东西,硬盘是共享的,但是读取硬盘的速度慢

---->IPC实现了一种内存空间上的共享(两个进程之间通过队列交流)

 

4.实例

from multiprocessing import Process, Queue
import os


def task1(q):
    print(f'I\'m task1,my id is {os.getpid()}')
    q.put('Hello World')


def task2(q):
    # res=q.get()
    print(f'I\'m task2,my id is {os.getpid()}')


if __name__ == '__main__':
    q = Queue(5)

    t1 = Process(target=task1, args=(q,))
    t1.start()
    t2 = Process(target=task2, args=(q,))
    t2.start()

    print(f'I\'m Main Process,I\'ll print {q.get()}')


# I'm task2,my id is 49516
# I'm task1,my id is 43284
# I'm Main Process,I'll print Hello World

生产者消费者模型

在并发编程中 使用生产者和消费者模式能够解决绝大多数并发问题。

该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

 

1.什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 

2.为什么要使用生产者和消费者模式?

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。

同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

为了解决这个问题,就在生产者和消费者之间通过队列,增加缓冲,避免了生产者和消费者 之间的交互。于是引入了生产者和消费者模式

 

 

3.用队列实现生产者消费者模型

 

入门版 单生产者 单消费者 - 有BUG

from multiprocessing import Process, Queue
import random
import time


def producer(p_name, p_food, q):
    for i in range(10):
        data = f'[{p_name}]制造了[{p_food}]'
        time.sleep(random.randint(1, 2))  # 模拟制造食物延迟
        print(data)
        q.put(p_food)


def consumer(c_name, q):
    while True:
        c_food = q.get()
        time.sleep(random.randint(1, 2))  # 模拟消费食物延迟
        print(f'[{c_name}]消费了[{c_food}]')


if __name__ == '__main__':
    q = Queue

    p1 = Process(target=producer, args=('xxq', 'Noodles', q))
    p1.start()

    c1 = Process(target=consumer, args=('darker', q))
    c1.start()


# BUG:如果消费者消费完了队列中的食物,队列食物为空,消费者就会停滞

基础版 单生产者 单消费者 - 有BUG

from multiprocessing import Process, Queue
import random
import time


def producer(p_name, p_food, q):
    for i in range(10):
        data = f'[{p_name}]制造了[{p_food}]'
        time.sleep(random.randint(1, 2))  # 模拟制造食物延迟
        print(data)
        q.put(p_food)
    q.put(None)


def consumer(c_name, q):
    while True:
        c_food = q.get()
        if c_food is None:
            return
        time.sleep(random.randint(1, 2))  # 模拟消费食物延迟
        print(f'[{c_name}]消费了[{c_food}]')


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=('xxq', 'Noodles', q))
    p1.start()

    c1 = Process(target=consumer, args=('darker', q))
    c1.start()

进阶版 单生产者 多消费者

from multiprocessing import Process, Queue
import random
import time


def producer(p_name, p_food, q):
    for i in range(10):
        data = f'[{p_name}]制造了[{p_food}]'
        time.sleep(random.randint(1, 2))  # 模拟制造食物延迟
        print(data)
        q.put(p_food)


def consumer(c_name, q):
    while True:
        c_food = q.get()
        if c_food is None:
            return
        time.sleep(random.randint(1, 2))  # 模拟消费食物延迟
        print(f'[{c_name}]消费了[{c_food}]')


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=('xxq', 'Noodles', q))
    p1.start()

    c1 = Process(target=consumer, args=('darker', q))
    c1.start()
    c2 = Process(target=consumer, args=('ben', q))
    c2.start()

    # 生产者生产完毕,放2个None
    p1.join()   # p1进程执行完成,再放None
    q.put(None)
    q.put(None)

高阶版 多生产者 多消费者

from multiprocessing import Process, Queue
import random
import time


def producer(p_name, p_food, q):
    for i in range(10):
        data = f'[{p_name}]制造了[{p_food}]'
        time.sleep(random.randint(1, 2))  # 模拟制造食物延迟
        print(data)
        q.put(p_food)


def consumer(c_name, q):
    while True:
        c_food = q.get()
        if c_food is None:
            return
        time.sleep(random.randint(1, 2))  # 模拟消费食物延迟
        print(f'[{c_name}]消费了[{c_food}]')


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=('xxq', 'Noodles', q))
    p1.start()
    p2 = Process(target=producer, args=('zsy', 'Pizza', q))
    p2.start()

    c1 = Process(target=consumer, args=('darker', q))
    c1.start()
    c2 = Process(target=consumer, args=('ben', q))
    c2.start()
    c3 = Process(target=consumer, args=('Alen', q))
    c3.start()

    # 生产者生产完毕,放2个None
    p1.join()  # p1进程执行完成,再放None
    p2.join()  # p2进程执行完成,再放None
    q.put(None)
    q.put(None)
    q.put(None)

终极版 多生产者 多消费者

"""
生产者:生产/制造东西的
消费者:消费/处理东西的
该模型除了上述两个之外还需要一个媒介
    例子:自助餐
    厨师(生产者)制作菜品之后,会将菜品放到餐台(消息队列)
    顾客(消费者)想要吃,可以直接到餐台(消息队列)去拿菜品
    在这个过程中,厨师(生产者)和顾客(消费者)并无直接联系
    而是以餐台(消息队列)为媒介的

生产者(厨师) + 消息队列(餐台) + 消费者(顾客)
"""

from multiprocessing import JoinableQueue, Process
import random
import time


def producer(p_name, p_food, q):
    for i in range(10):
        print(f'{p_name} 生产了 {p_food}')
        # 模拟网络延迟
        time.sleep(random.randint(1, 2))
        # 将数据加入队列
        q.put(p_food)


def consumer(c_name, q):
    while True:
        food = q.get()  # 假如没有数据,程序就会在此处进入堵塞态
        time.sleep(random.randint(1, 2))    # 模拟消费的延迟
        print(f'{c_name} 消费了 {food}')
        q.task_done()  # 告诉队列你已经从里面取出了一个数据并且处理完毕了


if __name__ == '__main__':
    q = JoinableQueue()

    p1 = Process(target=producer, args=('Cook1', 'Beef', q))
    p2 = Process(target=producer, args=('Cook2', 'Cake', q))

    c1 = Process(target=consumer, args=('Alan', q))
    c2 = Process(target=consumer, args=('Ben', q))
    c3 = Process(target=consumer, args=('Cindy', q))

    p1.start()
    p2.start()

    c1.daemon = True    # 将消费者设置成守护进程
    c2.daemon = True
    c3.daemon = True
    c1.start()
    c2.start()
    c3.start()

    p1.join()
    p2.join()
    # 等待生产者生产完毕之后 往队列中添加特定的结束符号
    # q.put(None) # 有多少消费者就往队列中添加多少None,新添加的None必定在队列的末尾

    q.join()  # 等待队列中所有的数据被取完再执行往下执行代码
    """
    JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
    没当你调用task_done的时候 计数器-1
    q.join() 当计数器为0的时候 才往后运行
    """
    # 只要q.join执行完毕 说明消费者已经处理完数据了  消费者就没有存在的必要了

进程池

from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor(2)
pool.submit(get_pages, url).add_done_callback(call_back)

 

发表评论

0/200
116 点赞
0 评论
收藏