상세 컨텐츠

본문 제목

15. 멀티스레드, 멀티프로세스

Python

by evaseo 2021. 5. 1. 15:43

본문

1.    동시성과 병렬성

(1)     동시성 (concurrency): 논리적으로 여러 작업이 동시에 실행되는 것처럼 보이는 것
ex) I/O (
파일 및 네트워크 소켓 입력 및 출력) 연산 등은 프로그램의 흐름에 큰 짐이 될 수 있다. 이럴 때 한 작업의 I/O 연산이 완료되기를 기다리는 동안 다른 작업을 수행하여 유휴 시간을 활용하는 것이 동시성이다.

 

(2)     병렬성 (parallelism): 물리적으로 여러 작업이 동시에 처리되는 것

1)       데이터 병렬성

        같은 작업을 병렬 처리하는 것

        하나의 커다란 작업에서 전체 데이터를 쪼갠 후 병렬 처리하면 작업을 빠르게 수행할 수 있다.

2)       작업 병렬성

        서로 다른 작업을 병렬 처리하는 것

        웹 서버에서는 다수의 독립적인 요청을 병렬로 개별적으로 처리할 수 있다.

 

2.    subprocess 모듈

(1)     파이썬 프로그램 내에서 새로운 프로세스를 지원하고 여기에 입출력 파이프를 연결하며 반환 코드를 얻을 수 있도록 하는 모듈

(2)     이를 응용하여 다른 언어로 만들어진 프로그램을 수행할 수 있으며 제어도 가능하게 만든다.

(3)     '부모-자식 (parent-child)' 프로세스 쌍을 생성하는 데 사용

(4)     부모 프로세스는 사용자에 의해 실행

(5)     부모 프로세스는 차례로 다른 일을 처리하는 자식 프로세스의 인스턴스를 실행

(6)     자식 프로세스를 사용함으로써, 멀티 코어의 이점을 최대한 취하고, 동시성 (concurrency) 문제를 운영 체제가 알아서 처리하도록 한다.

ex) from subprocess import *
Popen('calc.exe') ->
계산기 실행
Popen('notepad.exe') ->
메모장 실행

 

3.    threading모듈

(1)     파이썬에서 멀티스레드를 구현할 수 있게 해주는 모듈

(2)     Threading 모듈을 import하고 Thread 클래스를 이용

(3)     Thread클래스 - class threading.Thread

1)       별도의 제어 스레드에서 실행되는 활동을 나타냄

2)       사용방법

        콜러블 객체를 생성자에 전달
ex)
함수를 정의하여 target으로 생성자 Thread에 전달

#스레드(thread): Light Weight process라고도 함. 메인 프로세스와 병렬적으로 수행되는 단위 프로그램. 스레드 단위로 함수나 메소드 수행가능=멀티태스킹 가능
import time

def run(id):
    for i in range(1,11):
        print("id:{} --> {}".format(id, i))
        time.sleep(0.052)

#1. thread를 사용하지 않은 경우 순차적으로 호출하면 순차적으로 수행 - 무조건 run(1)을 다 하고 run(2)가 수행됨
"""
run(1)
run(2)
"""

#2. thread를 사용한 경우  멀티태스킹 가능, 스레드 스케쥴러에 의해 랜덤하게 스레드가 처리됨
import threading

#사용자 정의 thread를 생성
th1 = threading.Thread(target=run, args = ("일"))
th2 = threading.Thread(target=run, args = ("둘"))​

        서브 클래스에서 run() 메서드를 재정의

i.     서브 클래스에서는 다른 메서드(생성자를 제외하고)를 재정의 불가

ii.    오직 이 클래스의 __init__() run() 메서드만 재정의

#클래스로도 반복운용가능
class MyThread(threading.Thread):
    
    def run(self):
        
        for i in range(1,10):
            print('id:{} ==> {}'.format(self.getName(),i))
            time.sleep(0.1)​

3)       메소드

        Thread(): Thread클래스 생성자

        start()

i.     스레드 활동을 시작 메소드

ii.    스레드 객체 당 최대 한 번 호출

iii.   같은 스레드 객체에서 두 번 이상 호출되면, RuntimeError를 발생시킵니다.

        run()

i.     스레드의 활동을 표현하는 메서드

ii.    서브 클래스에서 이 메서드를 재정의 가능

        join([timeout=None])

i.     스레드가 종료할 때까지 기다리는 메소드

ii.    timeout 인자가 없거나 None이면, 스레드가 종료될 때까지 작업이 막음

        name

i.     식별 목적으로만 사용되는 문자열

ii.    여러 스레드에 같은 이름을 지정 가능

iii.   초기 이름은 생성자가 설정

iv.   getName()

v.    setName()

<함수를 정의하여 target으로 생성자 Thread에 전달>

#스레드(thread): Light Weight process라고도 함. 메인 프로세스와 병렬적으로 수행되는 단위 프로그램. 스레드 단위로 함수나 메소드 수행가능=멀티태스킹 가능
import time

def run(id):
    for i in range(1,11):
        print("id:{} --> {}".format(id, i))
        time.sleep(0.052)
#1. thread를 사용하지 않은 경우 순차적으로 호출하면 순차적으로 수행 - 무조건 run(1)을 다 하고 run(2)가 수행됨
"""
run(1)
run(2)
"""
 
import threading
#2. thread를 사용한 경우  멀티태스킹 가능, 스레드 스케쥴러에 의해 랜덤하게 스레드가 처리됨
#사용자 정의 thread를 생성
th1 = threading.Thread(target=run, args = ("일"))
th2 = threading.Thread(target=run, args = ("둘"))

#thread 수행시작
th1.start()
th2.start()
th1.join()#사용자정의 스레드가 끝날 때까지 메인스레드 대기시킴. 메인스레드는 제일 마지막에 실행되게 함
th2.join()

print('\n메인 프로그램 종료') #메인 스레드에 의해 메인모듈이 실행(기본 값)
#사용자정의 스레드 2개, main스레드 1개 => 총 3개의 스레드가 운용, 랜덤하게 스레드가 처리됨
#target이 끝나면 전체 종료​
실행결과

<서브 클래스에서 run() 메서드를 재정의>

import time
import threading

#클래스로도 반복운용가능
class MyThread(threading.Thread):
    
    def run(self):
        
        for i in range(1,10):
            print('id:{} ==> {}'.format(self.getName(),i))
            time.sleep(0.1)
ths = []
for i in range(2):
    th = MyThread()
    th.start()
    ths.append(th)
    
for th in ths:
    th.join()
print('프로그램 종료')​
실행결과

     두 개 이상의 스레드는 전역변수에 대해 공유를 하게 된다. 그러나 경우에 따라서는 스레드 간에 공유자원을 수행하면서 서로 간에 충돌이 발생할 수 있다. 그래서 하나의 스레드가 공유자원(전역변수, DB의 자료, 파일 등)을 사용하고 있을 때 다른 스레드는 잠시 대기함으로 해서 충돌을 막았으면 하는 때도 있을 수 있다. 이럴 경우에는 임의의 스레드가 공유자원을 처리(보통은 수정작업)할 때, 다른 스레드들의 수행에 대해 잠금상태로 만들어 놓고, 공유자원을 처리하고 난 뒤 잠금상태를 해제하면 위와 같은 공유자원의 충돌문제를 해결할 수 있게 된다.

 

(4)     Lock 객체 - class threading.Lock

1)       프리미티브 객체를 구현하는 클래스

     프리미티브 록(primitive lock): 잠겨 있을 때 특정 스레드가 소유하지 않는 동기화 프리미티브

2)       메소드

        Lock() : Lock클래스 생성자

        acquire(blocking=True, timeout=-1) => lock

i.     blocking 인자

a.       True(기본값)로 설정하여 호출하면, 록이 잠금 해제될 때까지 블록 한 다음, 잠금으로 설정하고 True를 반환

b.      False로 설정하여 호출하면, 블록 안함

ii.    timeout 인자

a.       록을 획득할 수 없는 한 최대 timeout에 지정된 초 동안 블록

b.      -1 timeout 인자는 제한 없는 대기를 지정.

c.       blockingFalse일 때 timeout을 지정하는 것은 금지

        release() - unlock

i.     록이 잠금일 때, 잠금 해제로 재설정하고 반환

ii.    잠금 해제된 록에서 호출되면, RuntimeError가 발생

<lock 미설정>

#thread간 공유 자원값 충돌 방지 - 동기화(자바의 synchronized = acquire(), release())

import threading, time

g_count = 0 

def threadCount(id, count):
    global g_count
    
    for i in range(count):
        
        print('id %s ==> count: %s, g_count: %s'%(id, i, g_count))
        g_count += 1
     
for i in range(1, 6):
    threading.Thread(target=threadCount, args = (i,5)).start()

time.sleep(1)    
print('final g_count: ', g_count)
print('finish process')​
실행결과

<lock 설정 - lock = threading.Lock() 설정>

#thread간 공유 자원값 충돌 방지 - 동기화(자바의 synchronized = acquire(), release())

import threading, time

g_count = 0 

lock = threading.Lock()# 전역변수는 자동으로 스레드의 공유자원이 됨

#충돌 발생
#id 4 ==> count: 2, g_count: 17
#id 5 ==> count: 0, g_count: 17
#충돌 방지를 위해 줄세우기 - lock걸기

def threadCount(id, count):
    global g_count
    
    for i in range(count):
        lock.acquire() #두 개 이상의 스레드 공유자원 충돌 방지. lock을 걸어서 다른 스레드 수행 대기
        
        print('id %s ==> count: %s, g_count: %s'%(id, i, g_count))
        g_count += 1
        
        
        lock.release() # lock 해제
        
for i in range(1, 6):
    threading.Thread(target=threadCount, args = (i,5)).start()

time.sleep(1)    
print('final g_count: ', g_count)
print('finish process')​

실행결과

 

(5)     condition객체 - class threading.Condition

1)       다른 스레드의 신호를 기다리는 동기화 프리미티브

스레드 동기화(일명 줄서기): 하나의 스레드를 수행하면서 공유자원을 처리할 때, 다른 스레드들의 수행에 대해 비활성화 상태로 만들어 놓고, 공유자원을 처리하고 난 뒤에 비활성화 상태를 해제

2)       내부에 대기 큐를 가지고 있음

3)       메소드가 호출된 스레드는 이 대기 큐에 들어가서 sleep상태가 되고 notify() notifyall() 메소드에 의해 깨어남.

4)       메소드

        acquire(*args) - lock

        release() - unlock

        wait(timeout=None)

i.     시간제한이 만료될 때까지 기다림

ii.    이 메서드가 호출될 때 호출하는 스레드가 Condition().acquire()가 먼저 정의되어 있지 않으면 RuntimeError가 발생

iii.   timeout: 작업의 시간제한을 초(또는 부분 초)로 지정

        notify(n=1)

i.     대기 중인 하나의 스레드를 깨움

ii.    이 메서드가 호출될 때 호출하는 스레드가 Condition().acquire()가 먼저 정의되어 있지 않으면RuntimeError가 발생

iii.   매개변수 n: 깨울 수 있는 스레드의 최대 개수

iv.   때때로 n 스레드보다 많이 깨울 수 있다.

        notify_all()

i.     대기 중인 모든 스레드를 깨움

ii.    이 메서드가 호출될 때 호출하는 스레드가 Condition().acquire()가 먼저 정의되어 있지 않으면RuntimeError가 발생


4.    GIL(Global Interpreter Lock)


(1)     파이썬 인터프리터가 한 스레드만 하나의 바이트코드를 실행시킬 수 있도록 해주는 Lock

(2)     하나의 스레드에 모든 자원을 허락하고 그 후에는 Lock을 걸어 다른 스레드는 실행할 수 없게 막음

(3)     멀티 스레드로 만들어도 본질적으로 싱글 스레드로만 동작


(4)     이런 이유로 스레드는 구조적으로 충돌이 발생하는 경우가 있다.

(5)     이를 보완하기 위해 멀티프로세싱 모듈을 지원

(6)     I/O 타임에는 CPU idle time이 발생하기 때문에 멀티스레딩 처리방법이 효과적

(7)     하이퍼스레딩같이 CPU 코어 한 개 안에서 여러 개의 프로세스를 처리하는 형태는 멀티스레딩 처리방법이 효과적이지 않음

 

하이퍼 스레딩: 하나의 CPU에서 두 개의 스레드를 동시에 처리하는 것으로, 쉽게 말해 CPU 작업 영역이 2배가 되어 처리 성능을 향상시키는 기술

 

[참고] python의 이해  박영권

 

python GIL. 자바나 C 계열의 언어를 접하다가 파이썬을 하다보면 이해가 안되는 것이… | by hans mj | Medium

 

고성능 파이썬 프로그래밍 3 - 멀티프로세싱과 멀티스레딩 (chacha95.github.io)

 

 

5.    multiprocessing 모듈

(1)     threading API에 기반을 두고 여러 프로세스 간 작업을 나누는 API를 제공

(2)     multiprocessing 모듈은 GIL의 병목현상을 피하기 위해 process 기반으로 병렬처리를 지원하는 모듈

(3)     프로세스간 자원 공유: Queue

1)       파이썬의 데이터 구조로 선입 선출 방식(FIFO)

2)       import Queue로 로딩

        생성자 Queue([n])

i.     객체생성

ii.    n: 생성할 수 있는 최대 개수

        메서드

i.     qsize(): 객체에 등록된 데이터 개수 반환

ii.    put(): 데이터 등록

iii.   get(): 등록된 데이터 반환

from multiprocessing import Process, Queue
 
sentinel = -1
 
def creator(data, q):
    # Creates data to be consumed and waits for the consumer
    # to finish processing
    
    print('Creating data and putting it on the queue')
    for item in data:
        q.put(item)
 

def my_consumer(q):
    # Consumes some data and works on it
    # In this case, all it does is double the input
   
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        print(processed)
 
        if data is sentinel:
            break
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()
 
    q.close()
    q.join_thread()
 
    process_one.join()
    process_two.join()​

실행결과

 

[참고]

[Python:자료구조] 파이썬 큐(Queue) , 우선순위 큐(PriorityQueue) 사용방법 예제 총정리 - 정보의 공유 사회를 꿈꾼다 (tistory.com)

 

[Python3] multiprocessing | Pool, Process, Queue : 네이버 블로그 (naver.com)


(4)     메인 프로세스

1)       파이썬 코드를 작성한 후 실행시키면 파이썬 인터프리터가 코드를 해석한 후 실행

2)       형식: if __name__ == "__main__":

3)       프로세스 실행은 if문 안에서 구현

4)       함수 import multiprocessing

        multiprocessing.current_process()

i.     현재 실행되는 프로세스에 대한 정보를 담고있는 객체 얻음

ii.    해당 객체의 name pid 속성에 접근하면 프로세스의 이름과 PID(Process ID)를 얻음

a.       multiprocessing.current_process().name

b.      multiprocessing.current_process().pip

 

PID: 운영체제가 각 프로세스에게 부여한 고유 번호로써 프로세스의 우선 순위를 조정하거나 종료하는   등 다양한 용도로 사용
import osos.getpid()함수로 얻을 수도 있다.
os.getpid(): 현재 진행중인  processId 반환
☞ 작업관리자에 세부정보 탭에서 processId = pid를 볼 수 있다.

 

(5)   프로세스 스포닝(spawning)

1)       부모 프로세스(Parent Proecess)가 운영체제에 요청하여 자식 프로세스(Child Process)를 새로 만들어내는 과정

2)       부모 프로세스가 처리할 작업이 많은 경우 자식 프로세스를 새로 만들어 일부 작업을 자식 프로세스에게 위임하여 처리

        부모 프로세스의 불필요한 파일 설명자와 핸들은 미상속

        fork또는 forkserver를 사용하는 것에 비해이 방법을 사용하여 프로세스를 시작하는 것은 다소 느림

 

fork또는 forkserver는 유닉스 프로그램에서만 사용가능

 

3)       multiprocessing 모듈을 이용하면 프로세스 스포닝을 쉽게 수행 가능

        Process 클래스의 인스턴스를 생성한 후 start( ) 메소드를 호출

        Process 클래스의 인스턴스를 생성할 때 생성될 자식 프로세스의 이름과 위임하고자 하는 함수 전달


(6)     process클래스

1)       그저 하나의 프로세스를 하나의 함수에 적당한 인자 값을 할당해 주고(없을 수도 있음) 진행

2)       from multiprocessing import Process 로딩

3)       생성자: Process(target = 실행할 함수명, args = (인자, ))

        프로세스와 연결하여 함수 호출

        args는 반드시 튜플형태

4)       메소드

        run()

i.     프로세스의 활동을 나타내는 메서드

ii.    하위 클래스에서 이 메서드를 재정의 가능

iii.   표준 실행() 메서드는 개체의 생성자에게 전달된 호출 가능한 개체를 대상 인수로 호출

iv.   별도 인자 값이 있는 경우 args kwargs 인수에서 가져온 순차적 및 키워드 인수가 있는 경우 각각 호출

        start()

i.     프로세스의 활동을 시작

ii.    프로세스 개체당 대부분의 한 번 호출

iii.   객체의 실행() 메서드가 별도의 프로세스에서 호출되도록 정렬

        join([타임 아웃])

i.     메서드가 종료라고 하는 프로세스가 될 때까지 차단

ii.    타임아웃

a.       정수

b.      정수만큼 차단

iii.   프로세스를 여러 번 결합가능

        close()

i.     프로세스 개체를 닫고 연결된 모든 리소스를 해제

ii.    기본 프로세스가 계속 실행 중인 경우 ValueError가 발생

# 멀티프로세싱을 위한 Process클래스
# 그저 하나의 프로세스를 하나의 함수에 적당한 인자값을 할당해 주고(없을 수도 있음) 진행

import time
import os
from multiprocessing import Process

def func():
    print('연속적으로 진행하고자 하는 어떤 작업')
    #time.sleep(1)
    
def doubler(number):
    result = number + 10
    func()
    proc = os.getpid()
    print('number: {0}, result: {1}, process id: {2}'.format(number, result, proc))
    
if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    procs = []
    
    for index, number in enumerate(numbers):
        proc = Process(target = doubler, args = (number, )) #프로세스와 연결하여 함수 호출
        procs.append(proc) # process에 join() 추가할 의도
        proc.start()
        
    for proc in procs:
        proc.join() ​
실행결과

(7)     Pool클래스

1)       입력 값에 대해 process들을 건너 건너 분배하여 함수실행을 병렬화 하는 방법, 퐁당퐁당 기법

2)       생성자 Pool(n)

        n: 한번에 처리할 프로세스 개수, Pool 객체: 3~5가 적당

        n의 값이 클수록 속도는 빠르지만 내부적으로 부하 걸림

3)       메서드

        map(함수, parameter)

i.     함수와 인자 값을 매핑하면서 데이터를 분배 처리

ii.    리턴값 = 리스트

        close(): 모든 작업이 완료되면 작업자 프로세스가 종료됩니다.

        terminate(): 계류 중인 작업을 완료하지 않고 즉시 작업자 프로세스를 중지

        join()

i.     작업자 프로세스가 종료될 때까지 대기

ii.    join() 호출 전에 반드시 close() terminate()를 호출

# GIL이란 Global Interpreter Lock의 약자로 파이썬 인터프리터가 한 스레드만 하나의 바이트코드를 실행 시킬 수 있도록 해주는 Lock
# 하나의 스레드에 모든 자원을 허락하고 그 후에는 Lock을 걸어 다른 스레드는 실행할 수 없게 막음
# 이런 이유로 스레드는 구조적으로 출돌이 발생하는 경우가 있다.
# 그래서 멀티프로세싱 모듈을 지원

# Pool: 입력값에 대해 process들을 건너건너 분배하여 함수실행을 병렬화 하는 방법, 퐁당퐁당 기법

from multiprocessing import Pool 
import time
import os

def f(x):
    print('값 ', x, '에 대한 작업 pid', os.getpid()) 
    #작업관리자에 세부정보 탭에서 processId = pid를 볼 수 있다. os.getpid(): 현재 진행중인  processId 반환
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    p = Pool(3) 
    #프로세스 수 = 3, Pool 객체: 3~5가 적당 - 자원의 효율성으로 너무많이 줘도 안좋음 - 빠르긴 하겠지만 내부적으로 부하걸림
    startTime = int(time.time())
    
    """ Pool 안씀  - thread사용  - 10초 걸림
    for i in range(0, 10):
        print(f(i))
    """
    #pool사용 - process 3개씩 처리
    print(p.map(f, range(0,10))) # 함수와 인자값을 매핑하면서 데이터를 분배 처리. 리턴값 = 리스트
    
    endTime = int(time.time())
    
    print('총 작업시간: ', (endTime - startTime))   ​

실행결과

 

※ Process클래스를 사용한 것보다 Pool클래스를 사용했을 때 작업시간이 더 단축된다.

 

프로세싱작업 미적용 vs Pool적용

# 멀티 프로세싱을 통한 웹 크롤링(여러페이지 집어올 땐)  
# 한페이지만 집어올 땐 스크랩핑

#1. 멀티프로세싱 없이 작업
import requests
from bs4 import BeautifulSoup as bs #마커랭기지 ex) html, xml만 지원 
import time
from conda.core import link

def get_links(): # 해당 컨텐츠의 a태그 얻기
    data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
    soup = bs(data, 'html.parser')
    my_titles = soup.select('h3 > a') #h3 > a: <h3>의 자손 a,  h3  a: <h3>의 자식 a
    
    #a태그의 href 속성(Attribute) 얻기
    data = []
    
    for title in my_titles:
        data.append(title.get('href')) #a태그의 href 속성(Attribute) 값 반환
        #print(title.get('href'))
    return data 

def get_connect(link):
    abs_link = "https://beomi.github.io" + link
    req = requests.get(abs_link)
    html = req.text
    soup = bs(html, 'html.parser')
    print(soup.select('h1')[0].text) #h1태그의 첫번 째 문자열만 출력


if __name__ == '__main__':
    #print(get_links())
    #print(len(get_links()))
    
    startTime = time.time()
    for link in get_links():
        get_connect(link)
        
    print('소요시간: %s 초'%(time.time()-startTime))​

실행결과(프로세싱작업 미적용)

 

# 멀티 프로세싱을 통한 웹 크롤링(여러페이지 집어올 땐)  
# 한페이지만 집어올 땐 스크랩핑

#1. 멀티프로세싱으로 작업

import requests
from bs4 import BeautifulSoup as bs #마커랭기지 ex) html, xml만 지원 
import time
from multiprocessing import Pool

def get_links(): # 해당 컨텐츠의 a태그 얻기
    data = requests.get("https://beomi.github.io/beomi.github.io_old/").text
    soup = bs(data, 'html.parser')
    my_titles = soup.select('h3 > a') #h3 > a: <h3>의 자손 a,  h3  a: <h3>의 자식 a
    
    #a태그의 href 속성(Attribute) 얻기
    data = []
    
    for title in my_titles:
        data.append(title.get('href')) #a태그의 href 속성(Attribute) 값 반환
    
    return data 

def get_connect(link):
    abs_link = "https://beomi.github.io" + link
    req = requests.get(abs_link)
    html = req.text
    soup = bs(html, 'html.parser')
    print(soup.select('h1')[0].text) #h1태그의 첫번 째 문자열만 출력

if __name__ == '__main__':
    startTime = time.time()
    
    pool = Pool(processes = 4)
    pool.map(get_connect,get_links())
        
    print('소요시간: %s 초'%(time.time()-startTime))

 

실행결과(프로세싱작업 Pool 적용)

 

 

[참고] multiprocessing — 프로세스 기반 병렬 처리 — Python 3.9.2 문서

 

 

'Python' 카테고리의 다른 글

16. pandas - (2) DataFrame  (0) 2021.05.02
16. pandas - (1) 개요, Series  (0) 2021.05.01
14. 소켓(Socket)  (0) 2021.05.01
13. 원격(remote) db연동 – MariaDB  (0) 2021.04.29
12. GUI(윈도우 프로그래밍)  (0) 2021.04.29

관련글 더보기