Как обмениваться данными между процессами и потоками в Python?

Ответ

Способы обмена данными зависят от того, используются ли процессы (multiprocessing) или потоки (threading), так как у них разные модели работы с памятью.

Между процессами (multiprocessing)

Процессы имеют изолированное адресное пространство памяти, поэтому для обмена данными требуются механизмы межпроцессного взаимодействия (IPC):

  1. Очереди (multiprocessing.Queue). Потоко- и процессобезопасный способ для обмена объектами. Идеально подходит для паттерна "производитель-потребитель".

    from multiprocessing import Process, Queue
    
    def worker(q):
        q.put('some data')
    
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get()) # Выведет 'some data'
    p.join()
  2. Каналы (multiprocessing.Pipe). Создает двустороннее соединение между двумя процессами. Быстрее, чем Queue, но предназначен только для двух конечных точек.

    from multiprocessing import Process, Pipe
    
    def worker(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    print(parent_conn.recv()) # Выведет [42, None, 'hello']
    p.join()
  3. Разделяемая память (Value, Array, shared_memory). Наиболее быстрый способ для обмена простыми типами данных (числа, массивы) без сериализации. Требует ручной синхронизации с помощью блокировок (Lock).

    from multiprocessing import Process, Value
    
    def increment(n):
        n.value += 1
    
    num = Value('i', 0) # 'i' - integer
    p = Process(target=increment, args=(num,))
    p.start()
    p.join()
    print(num.value) # Выведет 1

Между потоками (threading)

Потоки работают в общем адресном пространстве, поэтому могут напрямую обращаться к общим переменным. Однако такой доступ небезопасен и требует синхронизации.

  1. Очереди (queue.Queue). Это предпочтительный и самый безопасный способ обмена данными между потоками. Он инкапсулирует всю необходимую логику блокировок.

  2. Общие переменные с примитивами синхронизации. Можно использовать общие переменные, но их чтение и запись должны быть защищены с помощью threading.Lock, RLock, Semaphore и т.д., чтобы избежать состояния гонки.

Ответ 18+ 🔞

Так, слушай, а вот эта вся хуйня с обменом данными между процессами и потоками — это, блядь, две большие разницы, понимаешь? Совершенно разные модели, ёпта. Как будто сравнивать велосипед и танк: и то, и то едет, но принцип, блядь, пиздец какой разный.

Между процессами (multiprocessing)

Вот эти ребята — они как отдельные квартиры в доме. У каждого своя память, свой холодильник, своя тёща на диване. Чтобы передать соседу пачку пельменей, надо выйти в подъезд. Это и есть IPC — межпроцессное общение, ёбана.

  1. Очереди (multiprocessing.Queue). Это как, блядь, почтовый ящик на лестничной клетке. Один процесс туда кидает, другой оттуда забирает. Всё прикрыто, безопасно, никто никому в тарелку не плюнет.

    from multiprocessing import Process, Queue
    
    def worker(q):
        q.put('some data') # Кинул в ящик записку "пельмени готовы"
    
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get()) # Заглянул в ящик, достал записку. Выведет 'some data'
    p.join()
  2. Каналы (multiprocessing.Pipe). А это уже, сука, телефонная трубка между двумя квартирами. Быстрее, чем бегать к ящику, но только для двоих. Больше никого не подключишь.

    from multiprocessing import Process, Pipe
    
    def worker(conn):
        conn.send([42, None, 'hello']) # "Алло, сосед, у меня тут список странный..."
        conn.close()
    
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    print(parent_conn.recv()) # "Приём. Да, вижу, [42, None, 'hello']"
    p.join()
  3. Разделяемая память (Value, Array). Вот это уже, блядь, высший пилотаж! Это как прорубить дыру в стене между кухнями. Супербыстро, можно прямо рукой передать. Но, ёпта, если не координироваться, можно друг другу по рукам молотком дать. Поэтому нужны замки (Lock).

    from multiprocessing import Process, Value
    
    def increment(n):
        n.value += 1 # Сунул руку в дыру, пописал мелом на общей доске +1
    
    num = Value('i', 0) # Завели общую доску для цифр (тип 'i' — integer)
    p = Process(target=increment, args=(num,))
    p.start()
    p.join()
    print(num.value) # Смотрим на доску. Выведет 1

Между потоками (threading)

А вот потоки — это, сука, как сожители в одной коммуналке. Все живут в одной квартире, все лезут в один холодильник. Переменные общие на всех, можно прямо так и хватать. Но это и есть главная засада, блядь! Потому что пока один наливает суп, второй может выдернуть тарелку, и всё — суп на полу, состояние гонки, пиздец.

  1. Очереди (queue.Queue). Самый адекватный способ не перебить друг другу горло. Поставили в коридоре тумбочку с ящиком. Положил записку — закрыл. Взял записку — закрыл. Всё цивилизованно, все довольны. Используй это, не еби мозг.

  2. Общие переменные с блокировками. Ну, это если ты, блядь, максималист и любишь жить на грани. Да, можно все лезть в одну кастрюлю, но тогда надо кричать "Я ДЕРЖУ КРЫШКУ, НЕ ЛЕЗЬ!" (threading.Lock). Стоит забыть покричать — и в твоём борще уже чужой половник. Крайне не рекомендую, если не хочешь потом дебажить ебаную гонку, от которой волосы дыбом встают.

Короче, резюме: для процессов — почта, трубы или дыра в стене с замком. Для потоков — тумбочка с ящиком в коридоре. И не выёбывайся с общими переменными без крайней нужды, а то охуеешь потом.