파이썬/파이썬 코딩

[파이썬] 병렬처리(multi-threading, multi-processing)

Suda_777 2024. 4. 23. 04:28
반응형

1. 병렬처리를 하는 이유

  • 프로그램의 실행 속도를 향상
  • 여러 작업을 동시에 처리할 수 있음

2. 멀티스레딩 (Multi - Threading)

2.1. 쓰레드 설명

  • 쓰레드는 프로세스 내에서 실행되는 실행 단위
  • 같은 메모리 공간을 공유
  • 입출력 작업이 많은 시나리오에서 유리
  • 파이썬에서는 `threading` 모듈 사용
  • 멀티스레딩은 자원을 공유해야 하거나 I/O 바운드 작업(I/O-intensive tasks)이 중심이 될 때 효과적. (대량의 데이터를 디스크에서 읽거나 쓰거나, 네트워크를 통해 데이터를 송수신하는 작업을 포함합니다.)
  • 메모리 공유로 인해 데이터 동기화 문제가 발생할 수 있다.
  • 한 스레드의 실패가 전체 프로세스에 영향을 줄 수 있습니다.

2.2. threading 모듈 사용법

  • 쓰레드 생성과 실행
    • 쓰레드 객체를 생성한다. `threading.Thread`클래스를 상속받는다.
    • run() 메서드를 오버라이드하여 각 프로세스의 실행 로직을 정의
    • target 인자에는 쓰레드가 실행할 함수를 지정
    • args 인자에는 해당 함수에 전달할 인자를 튜플 형태로 지정
    • start() 함수로 쓰레드 실행
    • 쓰레드 대기 `.join` 메소드 사용
      • `join()` 함수를 쓰면, 메인쓰레드가 `join()`함수를 사용한 쓰레드가 완료될 때까지 기다려준다.
import threading

class MyThread(threading.Thread):
    def __init__(self, id, name=None):
        super().__init__(name=name)  # 스레드의 이름을 super 클래스의 생성자에 전달
        self.id = id
    
    def run(self):
        print(f"Thread {self.name} ({self.id}) is running")

# 스레드 생성 및 실행, 이름 지정
my_thread = MyThread(1, "MyCustomThread")
my_thread.start()
my_thread.join()

print("Main thread has completed")

 

  • 쓰레드의 이름 설정
    • name 파라미터 사용
t = threading.Thread(target=worker, name="worker_thread", args=(i,))

 

  • 현재 쓰레드의 이름 불러오기
    • log를 남겨 어떤 쓰레드가 실행된 것인지 확인 할 때 사용.
    • 쓰레드의 이름 확인
current_thread = threading.current_thread()
print(current_thread.name)

 

  • 락 (Lock)
    • 락은 가장 기본적인 동기화 메커니즘으로, 한 번에 하나의 쓰레드만 특정 섹션의 코드(임계 영역)에 접근하도록 허용
    • 락을 획득한 쓰레드만이 해당 섹션의 코드를 실행할 수 있고, 다른 쓰레드는 락이 해제될 때까지 대기해야 한다.
import threading

class MyThread(threading.Thread):
    def __init__(self, id, lock, name=None):
        super().__init__(name=name)
        self.id = id
        self.lock = lock
    
    def run(self):
        # Lock을 사용하여 임계 영역을 보호
        with self.lock:
            print(f"Thread {self.name} ({self.id}) is running in a critical section")

# Lock 객체 생성
lock = threading.Lock()

# 여러 스레드 생성 및 실행
threads = [MyThread(i, lock, f"MyCustomThread-{i}") for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print("Main thread has completed")

 

  • 세마포어 (Semaphore)
    • 세마포어는 락의 일반화된 형태로, 동시에 여러 쓰레드가 리소스에 접근할 수 있도록 허용합니다. 세마포어는 특정 숫자로 초기화되며, 이 숫자는 동시에 리소스에 접근할 수 있는 최대 쓰레드 수를 의미합니다.
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, id, semaphore, name=None):
        super().__init__(name=name)
        self.id = id
        self.semaphore = semaphore
    
    def run(self):
        with self.semaphore:
            # 세마포어를 사용하여 임계 영역에 대한 접근을 제한
            print(f"Thread {self.name} ({self.id}) has entered the critical section.")
            time.sleep(2)  # 리소스 사용을 시뮬레이션
            print(f"Thread {self.name} ({self.id}) is leaving the critical section.")

# 세마포어 객체 생성 (동시에 2개의 스레드만 접근 허용)
semaphore = threading.Semaphore(2)

# 여러 스레드 생성 및 실행
threads = [MyThread(i, semaphore, f"MyCustomThread-{i}") for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print("Main thread has completed")
  • 이벤트
    • 쓰레드간의 신호를 주고받을 때 사용
    • 하나의 쓰레드가 특정 이벤트의 발생을 기다리고, 다른 쓰레드가 이 이벤트를 발생시키면 대기중이던 쓰레드가 계속 진행
import threading
import time

class Worker(threading.Thread):
    def __init__(self, name, event):
        super().__init__(name=name)
        self.event = event  # 이벤트 객체를 인스턴스 변수로 받음
    
    def run(self):
        print(f"{self.name} is starting work")
        time.sleep(3)  # 작업 시뮬레이션
        print(f"{self.name} has finished work")
        self.event.set()  # 작업 완료 후 이벤트 설정

class Waiter(threading.Thread):
    def __init__(self, name, event):
        super().__init__(name=name)
        self.event = event  # 이벤트 객체를 인스턴스 변수로 받음
    
    def run(self):
        print(f"{self.name} is waiting for event")
        self.event.wait()  # 이벤트가 설정될 때까지 대기
        print(f"{self.name} has noticed that the event was set")

if __name__ == "__main__":
    event = threading.Event()  # 이벤트 객체 생성
    worker = Worker("Worker", event)  # 이벤트 객체를 스레드에 전달
    waiter = Waiter("Waiter", event)  # 이벤트 객체를 스레드에 전달

    worker.start()
    waiter.start()

    worker.join()
    waiter.join()

    print("Main thread has completed")
  • 컨디션
    • wait() 함수: 실행 시점부터 다른 쓰레드가 notify()함수를 사용할 때 까지 기다리기 시작함.
    • notify() 함수: 기다리고 있는 쓰레들이 실행하도록 함.
    • 조건의 재확인: 깨어난 후에는 대기를 시작한 원래의 조건이 여전히 유효한지 확인해야 한다. 이는 종종 while 루프 안에서 wait()를 호출하여 처리. 이렇게 하는 이유는 여러 스레드가 동시에 같은 조건을 기다리고 있을 경우, 한 스레드의 notify()로 깨어난 스레드가 실제 필요한 조건을 만족하지 못할 수 있기 때문이다.
import threading
import time
import random

# 버퍼
buffer = []
MAX_ITEMS = 10

class Producer(threading.Thread):
    def __init__(self, condition):
        super().__init__()
        self.condition = condition

    def run(self):
        global buffer
        while True:
            item = random.randint(1, 100)
            with self.condition:
                while len(buffer) == MAX_ITEMS:
                    self.condition.wait()  # 버퍼가 가득 찼으므로 대기
                buffer.append(item)
                print(f"Produced {item}")
                self.condition.notify()  # 버퍼에 공간이 있으므로 소비자에게 알림
            time.sleep(random.random())

class Consumer(threading.Thread):
    def __init__(self, condition):
        super().__init__()
        self.condition = condition

    def run(self):
        global buffer
        while True:
            with self.condition:
                while not buffer:
                    self.condition.wait()  # 버퍼가 비어있으므로 대기
                item = buffer.pop(0)
                print(f"Consumed {item}")
                self.condition.notify()  # 아이템을 소비했으므로 생산자에게 알림
            time.sleep(random.random())

if __name__ == "__main__":
    condition = threading.Condition()
    producer = Producer(condition)
    consumer = Consumer(condition)
    producer.start()
    consumer.start()

    producer.join()
    consumer.join()

 

3. 멀티프로세싱 (Multi-Processing)

3.1. 멀티프로세싱 설명

  • 독립된 메모리 공간을 가진 여러 프로세스를 생성
  • 한 프로세스의 오류가 다른 프로세스에 영향을 미치지 않는다. 격리된 메모리 공간으로 인해 안정성이 높다.
  • 프로세스 간의 통신(IPC, Inter-Process Communication)은 비교적 복잡하며, 파이프, 소켓, 공유 메모리 등을 사용
  • CPU 자원을 효율적으로 활용할 수 있으며, 실제 병렬 처리가 가능. 계산 집약적 작업에 적합
  • 단점
    • 리소스(메모리, CPU 시간) 사용이 많다
    • 프로세스 간 통신 비용이 높고 복잡
  • 멀티프로세싱은 각 작업이 독립적이며 CPU 사용이 높을 때 적합하고, 오류 격리와 실제 병렬 실행이 필요할 때 유리

 

3.2. Process 클래스 상속

  • Process 클래스: 개별 프로세스를 생성하고 관리할 수 있는 클래스
  • Value와 Array는 각각 단일 데이터와 배열 데이터를 프로세스 간에 공유할 때 사용.
  • lock도 사용가능
from multiprocessing import Process, Value, Array, Lock
import time

class CustomProcess(Process):
    def __init__(self, id, name, shared_value, shared_array, lock):
        super().__init__(name=name)
        self.id = id
        self.shared_value = shared_value
        self.shared_array = shared_array
        self.lock = lock

    def run(self):
        with self.lock:
            self.shared_value.value += 1
            for i in range(len(self.shared_array)):
                self.shared_array[i] += 1
        print(f"{self.name} (ID: {self.id}) running. Shared Value: {self.shared_value.value}, Shared Array: {list(self.shared_array)}")
        time.sleep(1)  # Simulate work

if __name__ == "__main__":
    # 공유 데이터 설정
    shared_value = Value('i', 0)  # 'i'는 int형을 의미
    shared_array = Array('d', [0.0, 100.0, 200.0])  # 'd'는 double형을 의미
    lock = Lock()

    processes = []
    for i in range(5):
        # 프로세스 이름 지정
        process_name = f"Process-{i}"
        p = CustomProcess(i, process_name, shared_value, shared_array, lock)
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("Main process has completed")

 

3.3. Pool 클래스

  • 특징
    • 자동으로 프로세스를 관리
    • 작업을 효율적으로 관리하고, 시스템 리소스를 최적화
    • 작은 작업에 대해 Pool을 사용하면 프로세스 간 통신과 관리에 따른 오버헤드가 발생할 수 있다
    • 대량의 동일하거나 비슷한 작업을 빠르게 처리하는데 유리함
  • 단일 변수를 다루는 경우 - map 메소드 사용
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == "__main__":
    with Pool(4) as p:
        print(p.map(square, [1, 2, 3, 4, 5]))  # [1, 4, 9, 16, 25]

 

  • 다중 변수를 다루는 경우 - starmap 메소드 사용
from multiprocessing import Pool

def add(x, y):
    return x + y

if __name__ == "__main__":
    with Pool(4) as p:
        args = [(1, 2), (2, 3), (3, 4), (4, 5)]
        print(p.starmap(add, args))  # [3, 5, 7, 9]

 

3.4. 큐(Queue) 사용

  • 사용시 고려사항
    • 다수의 생산자(Producers)와 소비자(Consumers)가 있는 복잡한 데이터 흐름에서 사용
  • put 함수로 쿠에 데이터를 넣음
  • get 함수로 큐에서 데이터를 가져감
from multiprocessing import Process, Queue
import time

class Producer(Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    
    def run(self):
        """프로듀서 프로세스: 데이터를 생성하고 큐에 삽입"""
        for i in range(5):
            print(f'Producing {i} and adding to queue')
            self.queue.put(i)
            time.sleep(1)
        self.queue.put(None)  # 종료 신호로 None을 사용

class Consumer(Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    
    def run(self):
        """컨슈머 프로세스: 큐에서 데이터를 꺼내서 처리"""
        while True:
            item = self.queue.get()
            if item is None:
                break
            print(f'Consuming item {item}')
        print('Consumer finished')

if __name__ == "__main__":
    queue = Queue()  # 프로세스 간 공유할 큐 생성

    # 프로듀서와 컨슈머 프로세스 생성
    producer = Producer(queue)
    consumer = Consumer(queue)

    # 프로세스 시작
    producer.start()
    consumer.start()

    # 프로세스가 종료될 때까지 기다림
    producer.join()
    consumer.join()

    print("Main process finished")

 

3.5. Pipe 사용

  • 사용시 고려사항
    • 빠른 성능과 간단한 1:1 통신이 필요한 경우 Pipe가 Queue보다 적절하다.
  • Pip() 으로 두개의 con을 만든다.  이 객체들은 서로 데이터를 주고받는 데 사용
  • send() 함수로 데이터를 전송함
  • recv()로 데이터를 받음.
  • 사용이 끝난 후에는 close() 메서드를 호출하여 리소스를 정리
from multiprocessing import Process, Pipe

class Sender(Process):
    def __init__(self, conn):
        super().__init__()
        self.conn = conn

    def run(self):
        """ 데이터를 보내는 프로세스의 메서드 """
        messages = ['Hello', 'Dear', 'How are you?', None]  # None은 종료 신호
        for msg in messages:
            print(f"Sending: {msg}")
            self.conn.send(msg)  # 데이터 전송
            if msg is None:
                break
        self.conn.close()  # 사용 완료 후 커넥션 닫기

class Receiver(Process):
    def __init__(self, conn):
        super().__init__()
        self.conn = conn

    def run(self):
        """ 데이터를 받는 프로세스의 메서드 """
        while True:
            msg = self.conn.recv()  # 데이터 수신
            if msg is None:
                print("Receiver shutting down.")
                break
            print(f"Received: {msg}")
        self.conn.close()  # 사용 완료 후 커넥션 닫기

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()  # 양방향 통신 파이프 생성

    # Sender와 Receiver 클래스의 인스턴스 생성
    sender = Sender(parent_conn)
    receiver = Receiver(child_conn)

    # 프로세스 시작
    sender.start()
    receiver.start()

    # 프로세스가 종료될 때까지 기다림
    sender.join()
    receiver.join()

    print("Main process completed.")

 

3.6. Manager 사용

  • Manager의 특징
    • 데이터 공유: Manager는 프로세스 간에 안전하게 데이터를 공유할 수 있는 여러 타입의 데이터 구조를 제공. 이 구조에는 list,dict, Value, Array, Namespace, Queue 등이 포함되며, 이들은 멀티프로세싱 환경에서 자동으로 동기화
    • 객체들은 서버 프로세스에 의해 관리되며, 이를 통해 프로세스 간의 안전한 통신과 데이터 공유가 가능
  • 주의사항
    • 일반적인 데이터 구조에 비해 속도가 느릴 수 있다.
  • Manager()를 사용하여 멀티프로세싱 매니저를 생성
  • Manager 객체는 시작 시 서버 프로세스를 생성하고, 종료 시 이를 자동으로 정리해야 하기 때문에 with 문을 사용하여 이러한 처리를 간편하게 자동화.
  • 공유 딕셔너리와 리스트 생성
from multiprocessing import Manager, Process

def worker(shared_dict, shared_list):
    shared_dict["id"] = 1
    shared_dict["name"] = "Python"
    shared_list.append(25)

if __name__ == "__main__":
    with Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        p = Process(target=worker, args=(shared_dict, shared_list))
        p.start()
        p.join()

        print(shared_dict)  # 출력: {'id': 1, 'name': 'Python'}
        print(shared_list)  # 출력: [25]

 

3.7. 동기화 메커니즘

  • multi-threading에서와 마찬가지로 아래 동기화 메커니즘을 지원한다. 
  • Lock, Event, Condition, Semaphore 
  • 사용법은 동일하다.

 

4. cpu 코어 개수에 맞게 활용하기

  • os.cpu_count() 메소드를 이용해 cpu의 코어 개수를 알아낼 수 있다.
  • 개수에 맞게 잘 써주면 되겠다.
from multiprocessing import Pool

def worker_task(x):
    return x * x

if __name__ == "__main__":
    with Pool(processes=os.cpu_count()) as pool:  # 프로세스 개수를 CPU 코어 수에 맞춤
        results = pool.map(worker_task, range(10))
    print(results)

 

작성후기

- 참 쉽죠?

반응형