[Python] 파이썬 병렬 프로그래밍 - futures

728x90

1. futures

파이썬에서 futures는 비동기 작업을 실행할 수 있도록 도와주는 모듈입니다. 이 모듈은 비동기 실행을 위한 api를 고수준으로 작성하고 사용하기 쉽도록 개선하기 위해 나온 것입니다.

 

일반적으로 network 및 I/O 관련 작업시 비동기방식의 활용을 권장합니다. 왜냐하면 비동기 작업을 실행하면 지연시간(block) cpu 및 리소스를 낭비하는 것을 방지할 수 있기 때문입니다. 

파이썬의 병렬 프로그래밍은 비동기 작업이 적합한 프로그램일 경우 눈에 띄는 성능향상을 볼 수 있습니다.

 

파이썬 3.2부터 도입된 concurrent.futures는 멀티스레딩 또는 멀티프로세싱 api가 통일되어 매우 사용하기 쉬워졌습니다. 이 모듈은 실행중인 작업취소, 완료여부 체크, 타임아웃옵션, 콜백추가, 동기화코드를 매우 쉽게 작성할 수 있도록 도와줍니다. 

 

그리고 future를 실행하려면 아래 처럼 __name__ 안에서 main함수를 호출해야 런타임 에러가 발생하지 않습니다.

if __name__ == '__main__':
    main()

 

2. GIL

파이썬으로 멀티스레딩을 공부하다보면 GIL이라는 용어를 많이 볼 수 있습니다. GIL은 Global Interpreter Lock의 약자로 두개 이상의 스레드가 동시에 실행될 때 하나의 자원을 엑세스하는 경우 발생하는 문제점을 방지하기 위해 실행되는 기능입니다. 즉 두개이상의 스레드가 실행된다면 리소스 전체에 락을 걸고 하나의 스레드만 접근 가능하도록 합니다. 이러한 이유로 파이썬에서 멀티스레딩은 효율적이지 못합니다. 물론 I/O 작업에는 예외가 있긴합니다.

파이썬에서 GIL을 우회하는 방법으로는 멀티프로세싱을 이용하거나, CPython을 이용하는 방법들이 있습니다.

 

3. futures 사용방법

파이썬에서 멀티프로세싱을 활용하여 얼마나 성능이 향상되는지 확인해보도록 하겠습니다.

work_list = ['덧셈','곱셈','제곱의 합','덧셈','곱셈','제곱의 합','덧셈']

def sum_generator(n):
    return sum(n for n in range(1,n+1))

def mul_generator(n):
    mul_value = 1
    for i in range(1,n):
        mul_value *= i

    return mul_value

def pow_sum(n):

    return sum(n**2 for n in range(1,n+1))

def run(str1):
    if str1 == '덧셈':
        result = sum_generator(10000000)
    elif str1 == '곱셈':
        result = mul_generator(100)
    else:
        result = pow_sum(10000000)
    return result
    
if __name__ == '__main__':
    # 실행함수 작성

위의 코드는 입력값에 따라서 서로 다른 연산을 하는 프로그램입니다.

 

1) 동기방식

def main():
    # 시작시간
    start_time = time.time()

    result = [run(i) for i in work_list]

    # 종료시간
    end_time = time.time()
    # 걸린시간
    finish_time = end_time - start_time

    print(f"동기 작업 결과 : {result}")
    print(f"동기 작업으로 총 걸린시간 : {finish_time}s")
    
#출력:
#동기 작업 결과 : [50000005000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 333333383333335000000, 50000005000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 333333383333335000000, 50000005000000]
#동기 작업으로 총 걸린시간 : 6.031844615936279s

 

대략 6초가 걸립니다.

 

2) map을 활용한 비동기 방식(멀티 프로세싱)

def main2(): # 동시성 실행

    #시작시간
    start_time = time.time()
    with ProcessPoolExecutor() as executor: 
        # map은 작업순서를 유지하고 즉시 실행
        result = executor.map(run,work_list)

    #종료시간
    end_time = time.time()
    #걸린시간
    finish_time = end_time-start_time

    print(f"concurrent.futures map 작업 결과: {list(result)}")
    print(f"concurrent.futures map으로 총 걸린시간 : {finish_time}s")
    
    
#출력:
# concurrent.futures map 작업 결과: [50000005000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 333333383333335000000, 50000005000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 333333383333335000000, 50000005000000]
# concurrent.futures map으로 총 걸린시간 : 2.581249952316284s

 

3) submit과 wait를 사용한 비동기방식(멀티 프로세싱)

def main3(): # 동시성 실행
    worker = min(10,len(work_list))

    #시작시간
    start_time = time.time()

    with ProcessPoolExecutor() as executor:
        futures_list = [executor.submit(run,work) for work in work_list]
        
        # print(f"{worker} : {future}")
        result = wait(futures_list,7) # 시간을 제어함 -> timeout 값 안에 처리된 것은 성공 결과로 반환, 처리 안 된 것은 실패로 반환
        #성공
        print(f"성공:{result.done}")
        #실패
        print(f"실패: {result.not_done}")
        #결과값 출력:
        print([future.result() for future in result.done])

    #종료시간
    end_time = time.time()
    #걸린시간
    finish_time = end_time-start_time

    msg = f"concurrent.futures wait으로 작업하는데 총 걸린시간 : {finish_time}s"
    print(msg)

#출력:
#성공:{<Future at 0x1f36880a610 state=finished returned int>, <Future at 0x1f36880ae80 state=finished returned int>, <Future at 0x1f368825280 state=finished returned int>, <Future at 0x1f3687ad8b0 state=finished returned int>, <Future at 0x1f36879fb20 state=finished returned int>, <Future at 0x1f36880a520 state=finished returned int>, <Future at 0x1f36880ab50 state=finished returned int>}
# 실패: set()
# [333333383333335000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 50000005000000, 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000, 50000005000000, 333333383333335000000, 50000005000000]
# concurrent.futures wait으로 작업하는데 총 걸린시간 : 2.582122325897217s

 

4) submit과 as_completed을 사용한 비동기방식(멀티프로세싱)

def main4():
    worker = min(10, len(work_list))

    # 시작시간
    start_time = time.time()

    with ProcessPoolExecutor() as executor:
    # with ThreadPoolExecutor() as executor:
        futures_list = [executor.submit(run, work) for work in work_list]

        for future in as_completed(futures_list): #as_completed는 먼저 완료된 것부터 반환한다
            result = future.result()
            done = future.done()
            cancelled = future.cancelled()

            # 성공
            print(f"성공-결과값 :{result} - {done}")
            # 실패
            print(f"실패: {cancelled}")

    # 종료시간
    end_time = time.time()
    # 걸린시간
    finish_time = end_time - start_time

    msg = f"concurrent.futures as_completed으로 작업하는데 총 걸린시간 : {finish_time}s"
    print(msg)

#출력:
# 성공-결과값 :933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000 - True
# 실패: False
# 성공-결과값 :933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000 - True
# 실패: False
# 성공-결과값 :50000005000000 - True
# 실패: False
# 성공-결과값 :50000005000000 - True
# 실패: False
# 성공-결과값 :50000005000000 - True
# 실패: False
# 성공-결과값 :333333383333335000000 - True
# 실패: False
# 성공-결과값 :333333383333335000000 - True
# 실패: False
# concurrent.futures as_completed으로 작업하는데 총 걸린시간 : 2.6429905891418457s

 

5) 멀티프로세싱 vs 멀티스레드 vs 동기방식 속도비교

def main4():
    
    # 시작시간
    start_time = time.time()

    with ProcessPoolExecutor() as executor:
        futures_list = [executor.submit(run, work) for work in work_list]

        for future in as_completed(futures_list): 
            result = future.result()
            
    # 종료시간
    end_time = time.time()
    # 걸린시간
    finish_time = end_time - start_time

    msg = f"멀티프로세싱으로 작업하는데 총 걸린시간 : {finish_time}s"
    print(msg)

def main5():
    
    # 시작시간
    start_time = time.time()

    
    with ThreadPoolExecutor() as executor:
        futures_list = [executor.submit(run, work) for work in work_list]

        for future in as_completed(futures_list): 
            result = future.result()

    # 종료시간
    end_time = time.time()
    # 걸린시간
    finish_time = end_time - start_time

    msg = f"멀티스레드로 작업하는데 총 걸린시간 : {finish_time}s"
    print(msg)

def main6():
    # 시작시간
    start_time = time.time()

    result = [run(i) for i in work_list]

    # 종료시간
    end_time = time.time()
    # 걸린시간
    finish_time = end_time - start_time

    print(f"동기 작업으로 총 걸린시간 : {finish_time}s")
    
#출력:
# 멀티프로세싱으로 작업하는데 총 걸린시간 : 2.568108320236206s
# 멀티스레드로 작업하는데 총 걸린시간 : 6.083815574645996s
# 동기 작업으로 총 걸린시간 : 5.99693751335144s

 

위 3개의 함수는 같은 결과를 반환합니다. 그런데 멀티스레딩은 유독 느린것을 확인할 수 있습니다. 이러한 상황이 발생하는 이유가 GIL 때문입니다.