목차
반응형
1. 병렬처리를 하는 이유
- 프로그램의 실행 속도를 향상
- 여러 작업을 동시에 처리할 수 있음
2. 멀티스레딩 (Multi - Threading)
2.1. 쓰레드 설명
- 쓰레드는 프로세스 내에서 실행되는 실행 단위
- 같은 메모리 공간을 공유
- 입출력 작업이 많은 시나리오에서 유리
- 파이썬에서는 `threading` 모듈 사용
- 멀티스레딩은 자원을 공유해야 하거나 I/O 바운드 작업(I/O-intensive tasks)이 중심이 될 때 효과적. (대량의 데이터를 디스크에서 읽거나 쓰거나, 네트워크를 통해 데이터를 송수신하는 작업을 포함합니다.)
- 메모리 공유로 인해 데이터 동기화 문제가 발생할 수 있다.
- 한 스레드의 실패가 전체 프로세스에 영향을 줄 수 있습니다.
2.2. threading 모듈 사용법
- 쓰레드 생성과 실행
- 쓰레드 객체를 생성한다. `threading.Thread`클래스를 상속받는다.
- run() 메서드를 오버라이드하여 각 프로세스의 실행 로직을 정의
- target 인자에는 쓰레드가 실행할 함수를 지정
- args 인자에는 해당 함수에 전달할 인자를 튜플 형태로 지정
- start() 함수로 쓰레드 실행
- 쓰레드 대기 `.join` 메소드 사용
- `join()` 함수를 쓰면, 메인쓰레드가 `join()`함수를 사용한 쓰레드가 완료될 때까지 기다려준다.
import threading
class MyThread(threading.Thread):
def __init__(self, id, name=None):
super().__init__(name=name) # 스레드의 이름을 super 클래스의 생성자에 전달
self.id = id
def run(self):
print(f"Thread {self.name} ({self.id}) is running")
# 스레드 생성 및 실행, 이름 지정
my_thread = MyThread(1, "MyCustomThread")
my_thread.start()
my_thread.join()
print("Main thread has completed")
- 쓰레드의 이름 설정
- name 파라미터 사용
t = threading.Thread(target=worker, name="worker_thread", args=(i,))
- 현재 쓰레드의 이름 불러오기
- log를 남겨 어떤 쓰레드가 실행된 것인지 확인 할 때 사용.
- 쓰레드의 이름 확인
current_thread = threading.current_thread()
print(current_thread.name)
- 락 (Lock)
- 락은 가장 기본적인 동기화 메커니즘으로, 한 번에 하나의 쓰레드만 특정 섹션의 코드(임계 영역)에 접근하도록 허용
- 락을 획득한 쓰레드만이 해당 섹션의 코드를 실행할 수 있고, 다른 쓰레드는 락이 해제될 때까지 대기해야 한다.
import threading
class MyThread(threading.Thread):
def __init__(self, id, lock, name=None):
super().__init__(name=name)
self.id = id
self.lock = lock
def run(self):
# Lock을 사용하여 임계 영역을 보호
with self.lock:
print(f"Thread {self.name} ({self.id}) is running in a critical section")
# Lock 객체 생성
lock = threading.Lock()
# 여러 스레드 생성 및 실행
threads = [MyThread(i, lock, f"MyCustomThread-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Main thread has completed")
- 세마포어 (Semaphore)
- 세마포어는 락의 일반화된 형태로, 동시에 여러 쓰레드가 리소스에 접근할 수 있도록 허용합니다. 세마포어는 특정 숫자로 초기화되며, 이 숫자는 동시에 리소스에 접근할 수 있는 최대 쓰레드 수를 의미합니다.
import threading
import time
class MyThread(threading.Thread):
def __init__(self, id, semaphore, name=None):
super().__init__(name=name)
self.id = id
self.semaphore = semaphore
def run(self):
with self.semaphore:
# 세마포어를 사용하여 임계 영역에 대한 접근을 제한
print(f"Thread {self.name} ({self.id}) has entered the critical section.")
time.sleep(2) # 리소스 사용을 시뮬레이션
print(f"Thread {self.name} ({self.id}) is leaving the critical section.")
# 세마포어 객체 생성 (동시에 2개의 스레드만 접근 허용)
semaphore = threading.Semaphore(2)
# 여러 스레드 생성 및 실행
threads = [MyThread(i, semaphore, f"MyCustomThread-{i}") for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Main thread has completed")
- 이벤트
- 쓰레드간의 신호를 주고받을 때 사용
- 하나의 쓰레드가 특정 이벤트의 발생을 기다리고, 다른 쓰레드가 이 이벤트를 발생시키면 대기중이던 쓰레드가 계속 진행
import threading
import time
class Worker(threading.Thread):
def __init__(self, name, event):
super().__init__(name=name)
self.event = event # 이벤트 객체를 인스턴스 변수로 받음
def run(self):
print(f"{self.name} is starting work")
time.sleep(3) # 작업 시뮬레이션
print(f"{self.name} has finished work")
self.event.set() # 작업 완료 후 이벤트 설정
class Waiter(threading.Thread):
def __init__(self, name, event):
super().__init__(name=name)
self.event = event # 이벤트 객체를 인스턴스 변수로 받음
def run(self):
print(f"{self.name} is waiting for event")
self.event.wait() # 이벤트가 설정될 때까지 대기
print(f"{self.name} has noticed that the event was set")
if __name__ == "__main__":
event = threading.Event() # 이벤트 객체 생성
worker = Worker("Worker", event) # 이벤트 객체를 스레드에 전달
waiter = Waiter("Waiter", event) # 이벤트 객체를 스레드에 전달
worker.start()
waiter.start()
worker.join()
waiter.join()
print("Main thread has completed")
- 컨디션
- wait() 함수: 실행 시점부터 다른 쓰레드가 notify()함수를 사용할 때 까지 기다리기 시작함.
- notify() 함수: 기다리고 있는 쓰레들이 실행하도록 함.
- 조건의 재확인: 깨어난 후에는 대기를 시작한 원래의 조건이 여전히 유효한지 확인해야 한다. 이는 종종 while 루프 안에서 wait()를 호출하여 처리. 이렇게 하는 이유는 여러 스레드가 동시에 같은 조건을 기다리고 있을 경우, 한 스레드의 notify()로 깨어난 스레드가 실제 필요한 조건을 만족하지 못할 수 있기 때문이다.
import threading
import time
import random
# 버퍼
buffer = []
MAX_ITEMS = 10
class Producer(threading.Thread):
def __init__(self, condition):
super().__init__()
self.condition = condition
def run(self):
global buffer
while True:
item = random.randint(1, 100)
with self.condition:
while len(buffer) == MAX_ITEMS:
self.condition.wait() # 버퍼가 가득 찼으므로 대기
buffer.append(item)
print(f"Produced {item}")
self.condition.notify() # 버퍼에 공간이 있으므로 소비자에게 알림
time.sleep(random.random())
class Consumer(threading.Thread):
def __init__(self, condition):
super().__init__()
self.condition = condition
def run(self):
global buffer
while True:
with self.condition:
while not buffer:
self.condition.wait() # 버퍼가 비어있으므로 대기
item = buffer.pop(0)
print(f"Consumed {item}")
self.condition.notify() # 아이템을 소비했으므로 생산자에게 알림
time.sleep(random.random())
if __name__ == "__main__":
condition = threading.Condition()
producer = Producer(condition)
consumer = Consumer(condition)
producer.start()
consumer.start()
producer.join()
consumer.join()
3. 멀티프로세싱 (Multi-Processing)
3.1. 멀티프로세싱 설명
- 독립된 메모리 공간을 가진 여러 프로세스를 생성
- 한 프로세스의 오류가 다른 프로세스에 영향을 미치지 않는다. 격리된 메모리 공간으로 인해 안정성이 높다.
- 프로세스 간의 통신(IPC, Inter-Process Communication)은 비교적 복잡하며, 파이프, 소켓, 공유 메모리 등을 사용
- CPU 자원을 효율적으로 활용할 수 있으며, 실제 병렬 처리가 가능. 계산 집약적 작업에 적합
- 단점
- 리소스(메모리, CPU 시간) 사용이 많다
- 프로세스 간 통신 비용이 높고 복잡
- 멀티프로세싱은 각 작업이 독립적이며 CPU 사용이 높을 때 적합하고, 오류 격리와 실제 병렬 실행이 필요할 때 유리
3.2. Process 클래스 상속
- Process 클래스: 개별 프로세스를 생성하고 관리할 수 있는 클래스
- Value와 Array는 각각 단일 데이터와 배열 데이터를 프로세스 간에 공유할 때 사용.
- lock도 사용가능
from multiprocessing import Process, Value, Array, Lock
import time
class CustomProcess(Process):
def __init__(self, id, name, shared_value, shared_array, lock):
super().__init__(name=name)
self.id = id
self.shared_value = shared_value
self.shared_array = shared_array
self.lock = lock
def run(self):
with self.lock:
self.shared_value.value += 1
for i in range(len(self.shared_array)):
self.shared_array[i] += 1
print(f"{self.name} (ID: {self.id}) running. Shared Value: {self.shared_value.value}, Shared Array: {list(self.shared_array)}")
time.sleep(1) # Simulate work
if __name__ == "__main__":
# 공유 데이터 설정
shared_value = Value('i', 0) # 'i'는 int형을 의미
shared_array = Array('d', [0.0, 100.0, 200.0]) # 'd'는 double형을 의미
lock = Lock()
processes = []
for i in range(5):
# 프로세스 이름 지정
process_name = f"Process-{i}"
p = CustomProcess(i, process_name, shared_value, shared_array, lock)
processes.append(p)
p.start()
for p in processes:
p.join()
print("Main process has completed")
3.3. Pool 클래스
- 특징
- 자동으로 프로세스를 관리
- 작업을 효율적으로 관리하고, 시스템 리소스를 최적화
- 작은 작업에 대해 Pool을 사용하면 프로세스 간 통신과 관리에 따른 오버헤드가 발생할 수 있다
- 대량의 동일하거나 비슷한 작업을 빠르게 처리하는데 유리함
- 단일 변수를 다루는 경우 - map 메소드 사용
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(4) as p:
print(p.map(square, [1, 2, 3, 4, 5])) # [1, 4, 9, 16, 25]
- 다중 변수를 다루는 경우 - starmap 메소드 사용
from multiprocessing import Pool
def add(x, y):
return x + y
if __name__ == "__main__":
with Pool(4) as p:
args = [(1, 2), (2, 3), (3, 4), (4, 5)]
print(p.starmap(add, args)) # [3, 5, 7, 9]
3.4. 큐(Queue) 사용
- 사용시 고려사항
- 다수의 생산자(Producers)와 소비자(Consumers)가 있는 복잡한 데이터 흐름에서 사용
- put 함수로 쿠에 데이터를 넣음
- get 함수로 큐에서 데이터를 가져감
from multiprocessing import Process, Queue
import time
class Producer(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
"""프로듀서 프로세스: 데이터를 생성하고 큐에 삽입"""
for i in range(5):
print(f'Producing {i} and adding to queue')
self.queue.put(i)
time.sleep(1)
self.queue.put(None) # 종료 신호로 None을 사용
class Consumer(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
"""컨슈머 프로세스: 큐에서 데이터를 꺼내서 처리"""
while True:
item = self.queue.get()
if item is None:
break
print(f'Consuming item {item}')
print('Consumer finished')
if __name__ == "__main__":
queue = Queue() # 프로세스 간 공유할 큐 생성
# 프로듀서와 컨슈머 프로세스 생성
producer = Producer(queue)
consumer = Consumer(queue)
# 프로세스 시작
producer.start()
consumer.start()
# 프로세스가 종료될 때까지 기다림
producer.join()
consumer.join()
print("Main process finished")
3.5. Pipe 사용
- 사용시 고려사항
- 빠른 성능과 간단한 1:1 통신이 필요한 경우 Pipe가 Queue보다 적절하다.
- Pip() 으로 두개의 con을 만든다. 이 객체들은 서로 데이터를 주고받는 데 사용
- send() 함수로 데이터를 전송함
- recv()로 데이터를 받음.
- 사용이 끝난 후에는 close() 메서드를 호출하여 리소스를 정리
from multiprocessing import Process, Pipe
class Sender(Process):
def __init__(self, conn):
super().__init__()
self.conn = conn
def run(self):
""" 데이터를 보내는 프로세스의 메서드 """
messages = ['Hello', 'Dear', 'How are you?', None] # None은 종료 신호
for msg in messages:
print(f"Sending: {msg}")
self.conn.send(msg) # 데이터 전송
if msg is None:
break
self.conn.close() # 사용 완료 후 커넥션 닫기
class Receiver(Process):
def __init__(self, conn):
super().__init__()
self.conn = conn
def run(self):
""" 데이터를 받는 프로세스의 메서드 """
while True:
msg = self.conn.recv() # 데이터 수신
if msg is None:
print("Receiver shutting down.")
break
print(f"Received: {msg}")
self.conn.close() # 사용 완료 후 커넥션 닫기
if __name__ == "__main__":
parent_conn, child_conn = Pipe() # 양방향 통신 파이프 생성
# Sender와 Receiver 클래스의 인스턴스 생성
sender = Sender(parent_conn)
receiver = Receiver(child_conn)
# 프로세스 시작
sender.start()
receiver.start()
# 프로세스가 종료될 때까지 기다림
sender.join()
receiver.join()
print("Main process completed.")
3.6. Manager 사용
- Manager의 특징
- 데이터 공유: Manager는 프로세스 간에 안전하게 데이터를 공유할 수 있는 여러 타입의 데이터 구조를 제공. 이 구조에는 list,dict, Value, Array, Namespace, Queue 등이 포함되며, 이들은 멀티프로세싱 환경에서 자동으로 동기화
- 객체들은 서버 프로세스에 의해 관리되며, 이를 통해 프로세스 간의 안전한 통신과 데이터 공유가 가능
- 주의사항
- 일반적인 데이터 구조에 비해 속도가 느릴 수 있다.
- Manager()를 사용하여 멀티프로세싱 매니저를 생성
- Manager 객체는 시작 시 서버 프로세스를 생성하고, 종료 시 이를 자동으로 정리해야 하기 때문에 with 문을 사용하여 이러한 처리를 간편하게 자동화.
- 공유 딕셔너리와 리스트 생성
from multiprocessing import Manager, Process
def worker(shared_dict, shared_list):
shared_dict["id"] = 1
shared_dict["name"] = "Python"
shared_list.append(25)
if __name__ == "__main__":
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
p = Process(target=worker, args=(shared_dict, shared_list))
p.start()
p.join()
print(shared_dict) # 출력: {'id': 1, 'name': 'Python'}
print(shared_list) # 출력: [25]
3.7. 동기화 메커니즘
- multi-threading에서와 마찬가지로 아래 동기화 메커니즘을 지원한다.
- Lock, Event, Condition, Semaphore
- 사용법은 동일하다.
4. cpu 코어 개수에 맞게 활용하기
- os.cpu_count() 메소드를 이용해 cpu의 코어 개수를 알아낼 수 있다.
- 개수에 맞게 잘 써주면 되겠다.
from multiprocessing import Pool
def worker_task(x):
return x * x
if __name__ == "__main__":
with Pool(processes=os.cpu_count()) as pool: # 프로세스 개수를 CPU 코어 수에 맞춤
results = pool.map(worker_task, range(10))
print(results)
작성후기
- 참 쉽죠?
반응형
'파이썬 > 파이썬 코딩' 카테고리의 다른 글
[파이썬] 매직메소드(Magic Methods) (0) | 2024.03.10 |
---|---|
[파이썬] 제너레이터(Generators)와 이터레이터(Iterators) (3) | 2024.03.07 |