인공지능 개발자 수다(유튜브 바로가기) 자세히보기

파이썬/파이썬 코딩

[파이썬] 병렬처리(multi-threading, multi-processing)

Suya_03 2024. 4. 23. 04:28

목차

    반응형

    1. 병렬처리를 하는 이유

    • 프로그램의 실행 속도를 향상
    • 여러 작업을 동시에 처리할 수 있음

    2. 멀티스레딩 (Multi - Threading)

    2.1. 쓰레드 설명

    • 쓰레드는 프로세스 내에서 실행되는 실행 단위
    • 같은 메모리 공간을 공유
    • 입출력 작업이 많은 시나리오에서 유리
    • 파이썬에서는 `threading` 모듈 사용
    • 멀티스레딩은 자원을 공유해야 하거나 I/O 바운드 작업(I/O-intensive tasks)이 중심이 될 때 효과적. (대량의 데이터를 디스크에서 읽거나 쓰거나, 네트워크를 통해 데이터를 송수신하는 작업을 포함합니다.)
    • 메모리 공유로 인해 데이터 동기화 문제가 발생할 수 있다.
    • 한 스레드의 실패가 전체 프로세스에 영향을 줄 수 있습니다.

    2.2. threading 모듈 사용법

    • 쓰레드 생성과 실행
      • 쓰레드 객체를 생성한다. `threading.Thread`클래스를 상속받는다.
      • run() 메서드를 오버라이드하여 각 프로세스의 실행 로직을 정의
      • target 인자에는 쓰레드가 실행할 함수를 지정
      • args 인자에는 해당 함수에 전달할 인자를 튜플 형태로 지정
      • start() 함수로 쓰레드 실행
      • 쓰레드 대기 `.join` 메소드 사용
        • `join()` 함수를 쓰면, 메인쓰레드가 `join()`함수를 사용한 쓰레드가 완료될 때까지 기다려준다.
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self, id, name=None):
            super().__init__(name=name)  # 스레드의 이름을 super 클래스의 생성자에 전달
            self.id = id
        
        def run(self):
            print(f"Thread {self.name} ({self.id}) is running")
    
    # 스레드 생성 및 실행, 이름 지정
    my_thread = MyThread(1, "MyCustomThread")
    my_thread.start()
    my_thread.join()
    
    print("Main thread has completed")

     

    • 쓰레드의 이름 설정
      • name 파라미터 사용
    t = threading.Thread(target=worker, name="worker_thread", args=(i,))

     

    • 현재 쓰레드의 이름 불러오기
      • log를 남겨 어떤 쓰레드가 실행된 것인지 확인 할 때 사용.
      • 쓰레드의 이름 확인
    current_thread = threading.current_thread()
    print(current_thread.name)

     

    • 락 (Lock)
      • 락은 가장 기본적인 동기화 메커니즘으로, 한 번에 하나의 쓰레드만 특정 섹션의 코드(임계 영역)에 접근하도록 허용
      • 락을 획득한 쓰레드만이 해당 섹션의 코드를 실행할 수 있고, 다른 쓰레드는 락이 해제될 때까지 대기해야 한다.
    import threading
    
    class MyThread(threading.Thread):
        def __init__(self, id, lock, name=None):
            super().__init__(name=name)
            self.id = id
            self.lock = lock
        
        def run(self):
            # Lock을 사용하여 임계 영역을 보호
            with self.lock:
                print(f"Thread {self.name} ({self.id}) is running in a critical section")
    
    # Lock 객체 생성
    lock = threading.Lock()
    
    # 여러 스레드 생성 및 실행
    threads = [MyThread(i, lock, f"MyCustomThread-{i}") for i in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print("Main thread has completed")

     

    • 세마포어 (Semaphore)
      • 세마포어는 락의 일반화된 형태로, 동시에 여러 쓰레드가 리소스에 접근할 수 있도록 허용합니다. 세마포어는 특정 숫자로 초기화되며, 이 숫자는 동시에 리소스에 접근할 수 있는 최대 쓰레드 수를 의미합니다.
    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self, id, semaphore, name=None):
            super().__init__(name=name)
            self.id = id
            self.semaphore = semaphore
        
        def run(self):
            with self.semaphore:
                # 세마포어를 사용하여 임계 영역에 대한 접근을 제한
                print(f"Thread {self.name} ({self.id}) has entered the critical section.")
                time.sleep(2)  # 리소스 사용을 시뮬레이션
                print(f"Thread {self.name} ({self.id}) is leaving the critical section.")
    
    # 세마포어 객체 생성 (동시에 2개의 스레드만 접근 허용)
    semaphore = threading.Semaphore(2)
    
    # 여러 스레드 생성 및 실행
    threads = [MyThread(i, semaphore, f"MyCustomThread-{i}") for i in range(5)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print("Main thread has completed")
    • 이벤트
      • 쓰레드간의 신호를 주고받을 때 사용
      • 하나의 쓰레드가 특정 이벤트의 발생을 기다리고, 다른 쓰레드가 이 이벤트를 발생시키면 대기중이던 쓰레드가 계속 진행
    import threading
    import time
    
    class Worker(threading.Thread):
        def __init__(self, name, event):
            super().__init__(name=name)
            self.event = event  # 이벤트 객체를 인스턴스 변수로 받음
        
        def run(self):
            print(f"{self.name} is starting work")
            time.sleep(3)  # 작업 시뮬레이션
            print(f"{self.name} has finished work")
            self.event.set()  # 작업 완료 후 이벤트 설정
    
    class Waiter(threading.Thread):
        def __init__(self, name, event):
            super().__init__(name=name)
            self.event = event  # 이벤트 객체를 인스턴스 변수로 받음
        
        def run(self):
            print(f"{self.name} is waiting for event")
            self.event.wait()  # 이벤트가 설정될 때까지 대기
            print(f"{self.name} has noticed that the event was set")
    
    if __name__ == "__main__":
        event = threading.Event()  # 이벤트 객체 생성
        worker = Worker("Worker", event)  # 이벤트 객체를 스레드에 전달
        waiter = Waiter("Waiter", event)  # 이벤트 객체를 스레드에 전달
    
        worker.start()
        waiter.start()
    
        worker.join()
        waiter.join()
    
        print("Main thread has completed")
    • 컨디션
      • wait() 함수: 실행 시점부터 다른 쓰레드가 notify()함수를 사용할 때 까지 기다리기 시작함.
      • notify() 함수: 기다리고 있는 쓰레들이 실행하도록 함.
      • 조건의 재확인: 깨어난 후에는 대기를 시작한 원래의 조건이 여전히 유효한지 확인해야 한다. 이는 종종 while 루프 안에서 wait()를 호출하여 처리. 이렇게 하는 이유는 여러 스레드가 동시에 같은 조건을 기다리고 있을 경우, 한 스레드의 notify()로 깨어난 스레드가 실제 필요한 조건을 만족하지 못할 수 있기 때문이다.
    import threading
    import time
    import random
    
    # 버퍼
    buffer = []
    MAX_ITEMS = 10
    
    class Producer(threading.Thread):
        def __init__(self, condition):
            super().__init__()
            self.condition = condition
    
        def run(self):
            global buffer
            while True:
                item = random.randint(1, 100)
                with self.condition:
                    while len(buffer) == MAX_ITEMS:
                        self.condition.wait()  # 버퍼가 가득 찼으므로 대기
                    buffer.append(item)
                    print(f"Produced {item}")
                    self.condition.notify()  # 버퍼에 공간이 있으므로 소비자에게 알림
                time.sleep(random.random())
    
    class Consumer(threading.Thread):
        def __init__(self, condition):
            super().__init__()
            self.condition = condition
    
        def run(self):
            global buffer
            while True:
                with self.condition:
                    while not buffer:
                        self.condition.wait()  # 버퍼가 비어있으므로 대기
                    item = buffer.pop(0)
                    print(f"Consumed {item}")
                    self.condition.notify()  # 아이템을 소비했으므로 생산자에게 알림
                time.sleep(random.random())
    
    if __name__ == "__main__":
        condition = threading.Condition()
        producer = Producer(condition)
        consumer = Consumer(condition)
        producer.start()
        consumer.start()
    
        producer.join()
        consumer.join()

     

    3. 멀티프로세싱 (Multi-Processing)

    3.1. 멀티프로세싱 설명

    • 독립된 메모리 공간을 가진 여러 프로세스를 생성
    • 한 프로세스의 오류가 다른 프로세스에 영향을 미치지 않는다. 격리된 메모리 공간으로 인해 안정성이 높다.
    • 프로세스 간의 통신(IPC, Inter-Process Communication)은 비교적 복잡하며, 파이프, 소켓, 공유 메모리 등을 사용
    • CPU 자원을 효율적으로 활용할 수 있으며, 실제 병렬 처리가 가능. 계산 집약적 작업에 적합
    • 단점
      • 리소스(메모리, CPU 시간) 사용이 많다
      • 프로세스 간 통신 비용이 높고 복잡
    • 멀티프로세싱은 각 작업이 독립적이며 CPU 사용이 높을 때 적합하고, 오류 격리와 실제 병렬 실행이 필요할 때 유리

     

    3.2. Process 클래스 상속

    • Process 클래스: 개별 프로세스를 생성하고 관리할 수 있는 클래스
    • Value와 Array는 각각 단일 데이터와 배열 데이터를 프로세스 간에 공유할 때 사용.
    • lock도 사용가능
    from multiprocessing import Process, Value, Array, Lock
    import time
    
    class CustomProcess(Process):
        def __init__(self, id, name, shared_value, shared_array, lock):
            super().__init__(name=name)
            self.id = id
            self.shared_value = shared_value
            self.shared_array = shared_array
            self.lock = lock
    
        def run(self):
            with self.lock:
                self.shared_value.value += 1
                for i in range(len(self.shared_array)):
                    self.shared_array[i] += 1
            print(f"{self.name} (ID: {self.id}) running. Shared Value: {self.shared_value.value}, Shared Array: {list(self.shared_array)}")
            time.sleep(1)  # Simulate work
    
    if __name__ == "__main__":
        # 공유 데이터 설정
        shared_value = Value('i', 0)  # 'i'는 int형을 의미
        shared_array = Array('d', [0.0, 100.0, 200.0])  # 'd'는 double형을 의미
        lock = Lock()
    
        processes = []
        for i in range(5):
            # 프로세스 이름 지정
            process_name = f"Process-{i}"
            p = CustomProcess(i, process_name, shared_value, shared_array, lock)
            processes.append(p)
            p.start()
    
        for p in processes:
            p.join()
    
        print("Main process has completed")

     

    3.3. Pool 클래스

    • 특징
      • 자동으로 프로세스를 관리
      • 작업을 효율적으로 관리하고, 시스템 리소스를 최적화
      • 작은 작업에 대해 Pool을 사용하면 프로세스 간 통신과 관리에 따른 오버헤드가 발생할 수 있다
      • 대량의 동일하거나 비슷한 작업을 빠르게 처리하는데 유리함
    • 단일 변수를 다루는 경우 - map 메소드 사용
    from multiprocessing import Pool
    
    def square(x):
        return x * x
    
    if __name__ == "__main__":
        with Pool(4) as p:
            print(p.map(square, [1, 2, 3, 4, 5]))  # [1, 4, 9, 16, 25]

     

    • 다중 변수를 다루는 경우 - starmap 메소드 사용
    from multiprocessing import Pool
    
    def add(x, y):
        return x + y
    
    if __name__ == "__main__":
        with Pool(4) as p:
            args = [(1, 2), (2, 3), (3, 4), (4, 5)]
            print(p.starmap(add, args))  # [3, 5, 7, 9]

     

    3.4. 큐(Queue) 사용

    • 사용시 고려사항
      • 다수의 생산자(Producers)와 소비자(Consumers)가 있는 복잡한 데이터 흐름에서 사용
    • put 함수로 쿠에 데이터를 넣음
    • get 함수로 큐에서 데이터를 가져감
    from multiprocessing import Process, Queue
    import time
    
    class Producer(Process):
        def __init__(self, queue):
            super().__init__()
            self.queue = queue
        
        def run(self):
            """프로듀서 프로세스: 데이터를 생성하고 큐에 삽입"""
            for i in range(5):
                print(f'Producing {i} and adding to queue')
                self.queue.put(i)
                time.sleep(1)
            self.queue.put(None)  # 종료 신호로 None을 사용
    
    class Consumer(Process):
        def __init__(self, queue):
            super().__init__()
            self.queue = queue
        
        def run(self):
            """컨슈머 프로세스: 큐에서 데이터를 꺼내서 처리"""
            while True:
                item = self.queue.get()
                if item is None:
                    break
                print(f'Consuming item {item}')
            print('Consumer finished')
    
    if __name__ == "__main__":
        queue = Queue()  # 프로세스 간 공유할 큐 생성
    
        # 프로듀서와 컨슈머 프로세스 생성
        producer = Producer(queue)
        consumer = Consumer(queue)
    
        # 프로세스 시작
        producer.start()
        consumer.start()
    
        # 프로세스가 종료될 때까지 기다림
        producer.join()
        consumer.join()
    
        print("Main process finished")

     

    3.5. Pipe 사용

    • 사용시 고려사항
      • 빠른 성능과 간단한 1:1 통신이 필요한 경우 Pipe가 Queue보다 적절하다.
    • Pip() 으로 두개의 con을 만든다.  이 객체들은 서로 데이터를 주고받는 데 사용
    • send() 함수로 데이터를 전송함
    • recv()로 데이터를 받음.
    • 사용이 끝난 후에는 close() 메서드를 호출하여 리소스를 정리
    from multiprocessing import Process, Pipe
    
    class Sender(Process):
        def __init__(self, conn):
            super().__init__()
            self.conn = conn
    
        def run(self):
            """ 데이터를 보내는 프로세스의 메서드 """
            messages = ['Hello', 'Dear', 'How are you?', None]  # None은 종료 신호
            for msg in messages:
                print(f"Sending: {msg}")
                self.conn.send(msg)  # 데이터 전송
                if msg is None:
                    break
            self.conn.close()  # 사용 완료 후 커넥션 닫기
    
    class Receiver(Process):
        def __init__(self, conn):
            super().__init__()
            self.conn = conn
    
        def run(self):
            """ 데이터를 받는 프로세스의 메서드 """
            while True:
                msg = self.conn.recv()  # 데이터 수신
                if msg is None:
                    print("Receiver shutting down.")
                    break
                print(f"Received: {msg}")
            self.conn.close()  # 사용 완료 후 커넥션 닫기
    
    if __name__ == "__main__":
        parent_conn, child_conn = Pipe()  # 양방향 통신 파이프 생성
    
        # Sender와 Receiver 클래스의 인스턴스 생성
        sender = Sender(parent_conn)
        receiver = Receiver(child_conn)
    
        # 프로세스 시작
        sender.start()
        receiver.start()
    
        # 프로세스가 종료될 때까지 기다림
        sender.join()
        receiver.join()
    
        print("Main process completed.")

     

    3.6. Manager 사용

    • Manager의 특징
      • 데이터 공유: Manager는 프로세스 간에 안전하게 데이터를 공유할 수 있는 여러 타입의 데이터 구조를 제공. 이 구조에는 list,dict, Value, Array, Namespace, Queue 등이 포함되며, 이들은 멀티프로세싱 환경에서 자동으로 동기화
      • 객체들은 서버 프로세스에 의해 관리되며, 이를 통해 프로세스 간의 안전한 통신과 데이터 공유가 가능
    • 주의사항
      • 일반적인 데이터 구조에 비해 속도가 느릴 수 있다.
    • Manager()를 사용하여 멀티프로세싱 매니저를 생성
    • Manager 객체는 시작 시 서버 프로세스를 생성하고, 종료 시 이를 자동으로 정리해야 하기 때문에 with 문을 사용하여 이러한 처리를 간편하게 자동화.
    • 공유 딕셔너리와 리스트 생성
    from multiprocessing import Manager, Process
    
    def worker(shared_dict, shared_list):
        shared_dict["id"] = 1
        shared_dict["name"] = "Python"
        shared_list.append(25)
    
    if __name__ == "__main__":
        with Manager() as manager:
            shared_dict = manager.dict()
            shared_list = manager.list()
    
            p = Process(target=worker, args=(shared_dict, shared_list))
            p.start()
            p.join()
    
            print(shared_dict)  # 출력: {'id': 1, 'name': 'Python'}
            print(shared_list)  # 출력: [25]

     

    3.7. 동기화 메커니즘

    • multi-threading에서와 마찬가지로 아래 동기화 메커니즘을 지원한다. 
    • Lock, Event, Condition, Semaphore 
    • 사용법은 동일하다.

     

    4. cpu 코어 개수에 맞게 활용하기

    • os.cpu_count() 메소드를 이용해 cpu의 코어 개수를 알아낼 수 있다.
    • 개수에 맞게 잘 써주면 되겠다.
    from multiprocessing import Pool
    
    def worker_task(x):
        return x * x
    
    if __name__ == "__main__":
        with Pool(processes=os.cpu_count()) as pool:  # 프로세스 개수를 CPU 코어 수에 맞춤
            results = pool.map(worker_task, range(10))
        print(results)

     

    작성후기

    - 참 쉽죠?

    반응형