카테고리 없음

스파르타 AI-8기 TIL(10/10) -> 동시성

kimjunki-8 2024. 10. 10. 20:53

 

목차
1. 동시성
동시성
파이썬에서 동시성이란, 한번에 두개 이상의 작업을 수행하도록 도와주는 코드입니다.
그렇기 위해서는 저희는 오늘 3개의 주제를 배워보도록 하겠습니다!
1. 스레드(threading)
2. 프로세스(multiprocessing)
3. 비동기 I/O (asyncio)

 

1. 스레드(Threading)
-> 스레드는 한번에 여러 작업 실행할 수 있게 해주는 하나의 모듈입니다.
하지만 그러기 위해서는 여러개의 스레스를 써야합니다.
스레드 구조:
import threading
threading.Thread(target = )
Thread.start
Thread.join

threading -> 스레드의 모듈입니다.
Thread -> 스레드 모듈에 포함되어 있는 클래스입니다. (여기서 Thread의 T는 항상 대문자이여 합니다.)
target -> 실행시킬 작업(함수 이름 등등)의 이름.
.start() -> 스레드를 실행시키는 Thread 클래스의 메서드 입니다.
.join() -> 스레드가 종료될 때까지 기다리는 Thread 클래스의 메서드 입니다.(.join은 살짝 어려운 개념이다!)

예시:

import threading
import time

def print_numbers():
    for i in range(1, 6):
        print(f"Thread 1: {i}")
        time.sleep(1)

def print_letters():
    for letter in ['A', 'B', 'C', 'D', 'E']:
        print(f"Thread 2: {letter}")
        time.sleep(1.3)

# 두 개의 스레드 생성
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

# 스레드 시작
thread1.start()
thread2.start()

# 두 스레드가 종료될 때까지 기다림
thread1.join()
thread2.join()

print("Both threads have finished")
먼저 실행시킬 함수들을 만듭니다. 먼저,
def print_number() -> 숫자 1부터 5까지 f"Thread 1: {i}" 형태로 출력이 됩니다.
하지만 저 time.sleep(1)이란건 뭐죠? 일단은 여기서 출력되는 시간을 1초 늘려준다는 개념만 잡고 있으면 됩니다.

마찬가지로 def print_letters도 만찬가지로 작동하게 됩니다.
그렇게 각각 스레드를 사용해 target(실행시킬 작업)을 각각 함수의 이름을 넣어, .start()를 이용해  각 함수를 동시에 실행을 시킵니다. 그렇게 나온 출력은 다음과 같습니다.
Thread 1: 1
Thread 2: A
Thread 1: 2
Thread 2: B
Thread 1: 3
Thread 2: C
Thread 1: 4
Thread 2: D
Thread 1: 5
Thread 2: E
Both threads have finished
하지만, 주의해야 할 점은 .sleep()를 통해 시간을 지연시킨다면, 완벽히 동시에 실행이 되기 때문에 출력을 할 때, 보기 안 좋게 나올 수 있으니 주의해야 합니다!

여기서 잠깐!

여기서 한번 다른 주제를 다뤄보도록 하죠.

첫번째로 아까 나왔던 time 모듈과, thread의 추가 속성에 대해 자세히 알아봅시다

파이썬에서는 정말, 정말 수 많은 모듈들이 있습니다. 진짜 너무 많습니다. 그것을 다 기억하기에는 솔직히 무리가 있죠. 그래서 솔직히 외운다기 보다는, 필요할때만 찾아서 쓰시는 것을 추천드립니다.
일단 time모듈에는 여러가지 클래스들이 있습니다.
time 모듈: 
sleep(seconds): 지정한 시간(초) 동안 프로그램을 멈춤.
time(): 현재 시간을 초 단위로 반환.
gmtime(): 현재 시간을 UTC 기준으로 구조체로 반환.
strftime(format): 날짜와 시간을 문자열 형식으로 반환.
대체적으로 이렇게 있습니다.

제가 썼던것은 바로 sleep(seconds)입니다. 
솔직히 그 밖에이도 자주 쓰이는 것들은
1. itertools 모듈
2. math 모듈
3. functools 모듈
4. random 모듈
5. statistics 모듈
6. collections 모듈
7. os 모듈
8. re 모듈
9. json 모듈
있습니다. 이것들 하나하나 다루기는 주제 밖이니 다음에 미루도록 합시다...

 

근데 왜 저 함수에는 return을 안 넣을까?

◈함수에 return을 주는것과 return을 주지 않는 것에는 큰 차이가 있습니다.

예시로 보여드리겠습니다.

def say_hello():
    print('Hi world!')
say_hello()

def add(a, b):
    return a + b
add1 = add(1,2)
print(add)

를 자세히 보면 무엇인가 다릅니다. 기억해야 할 점은 바로, 

값을 조절할 수 있다.

일 것 같습니다.

return을 이용하면 그 말대로 제가 가지고 놀 수 있는 장난감이 되는 겁니다. 마음대로 값을 추가하거나, 설정하거나, 지울 수 있게 되는것입니다.
하지만 그와 반대로 return이 없는것은 가게나 상점에 가면 볼 수 있는 '간판' 같은 것입니다. 딱 보여주기 용도로만 쓰이죠. 물론 값을 추가하거나, 바꿀 수 있습니다. 그렇게 된다면 저는 간판도 똑같이 바꿔야 하겠죠. 

제가 위에서 쓴 함수도 return을 안 준 이유가 바로, 값을 안에서 설정 후, 그대로 보여주기 때문입니다. 따로 넣어서 쓰임이 아니라, 딱 안에서 계산 후, 보여주기만 하는 식으로 작동하게 만들었기 때문입니다.

즉, return을 쓰임과 안 쓰임의 차이는 '함수의 목적에 따라 달린다'라고 할 수 있습니다.
그래도 모르니, return을 쓰지 않는 기준 4가지를 적겠습니다.
1. 값을 계산하거나 처리한 결과를 다른 곳에서 사용할 필요가 없을 때
-함수가 제 역할을 끝내고 그 결과를 다시 사용할 필요가 없을 때 반환할 값도 필요가 없습니다. 예를 들어 그저 메시지만 전달하는 함수면, 그 결과물을 다시 쓸 필요가 없기 때문에 return을 쓰지 않습니다.
2. 함수가 상태를 변경하는 경우
-스스로 파일을 업데이트 시키거나, 상태를 변경하는 행동을 할 때면, 안에 함수만 작동 시키고 별 다른 값을 반환하지 않을 때 return을 쓰지 않습니다.
3. 절차적 작업을 수행할 때
-스스로 절차적으로 업데이트나 작업을 수행하고 종료하면서, 그 결과값이 필요가 없을 때, return을 쓰지 않습니다. 예를 들어 프레임 업데이트를 하는데, 실질적으로 프레임 업데이트 값이 필요 없는것과 같습니다.
4.  메시지 전달만 할 때
-간단히 말해서 함수를 호출하면 메세지만 전달할 때, return을 쓰지 않아도 괜찮습니다. 위에 say_hello 함수 처럼 말입니다.

스레드의 추가 속성
아까 .join()은 약간 어려운 개념이라고 했는데, 지금 설명하겠습니다
사실 .join()은 안 써도 상관이 없습니다. .start()만 써도 잘 작동이 됩니다. 하지만 코드가 어려워지면 .join()을 써야합니다. 
대표적으로 '멀티스레딩'을 쓸 때 사용이 됩니다.
중요한점 ->
멀티스레딩은 그냥 스레드가 동시에 여러개 사용되는 것이 멀티스레딩이라 부릅니다.

게다가 스레딩에서 쓸 매개변수는 target만 있는것이 아닙니다. 주요 매개변수로는 5개가 있습니다.

 

1. target
기존에 쓰는 target입니다. 실행시킬 작업의 이름입니다.

2. args: target
함수에 받을 값을 정해놓고, 스레드 함수 안에 args를 튜플  형태로 전달합니다. args = (5,)(튜플 형태이기 때문에 끝에 ','추가)로 적으면 함수에 매개변수 a가 5를 인자로 받아 0~4의 숫자가 출력됩니다.
import threading

def print_numbers(a):  # 인자를 받아 숫자를 출력하는 함수 정의
    for b in range(a):  # n까지의 숫자를 출력
        print(b)

# 스레드에서 실행할 함수와 그 함수에 전달할 인자를 지정
thread = threading.Thread(target=print_numbers, args=(5,))  # args에 인자를 튜플 형태로 전달

thread.start()  # 스레드를 시작하여 print_numbers(5) 함수를 실행합니다.
thread.join()  # 메인 스레드가 print_numbers() 함수가 끝날 때까지 기다립니다.

print("스레드 종료")  # 메인 스레드가 종료되었음을 출력합니다.

3. kwargs: target
args:target과 비슷하게 작동하지만, args와 다르게 딕셔너리 형태로 전달해야 합니다. {} 그렇게 하면 'Kevin씨 반갑습니다! 어서오세요!'라고 출력이 됩니다.
참고로 args와 kwargs는 같이 인자로 받을 수 있습니다
예: def num_str(*args, **kwargs)

import threading  # threading 모듈을 가져옵니다.

def hi(name, greeting):  # 이름과 인사말을 받아 출력하는 함수 정의
    print(f'{name}씨 반갑습니다! {greeting}') # 인사 메시지를 출력

# 스레드에서 실행할 함수와 키워드 인자를 지정
thread = threading.Thread(target = hi, kwargs = {'name' : 'Kevin', 'greeting' : '어서오세요!'})#키워드 인자로 전달

thread.start()  # 스레드를 시작하여 greeting() 함수를 실행합니다.
thread.join()  # 메인 스레드가 greeting() 함수가 끝날 때까지 기다립니다.

print("스레드 종료")  # 메인 스레드가 종료되었음을 출력합니다.

4. name:
간단합니다! 그냥 스레드의 이름을 지정할 수 있습니다.
하지만!
threading.current_thread().name ->
threading -> 스레드 모듈
current_thread() -> 스레드 모듈 함수(인자로 받은 값을 스레드의 이름으로 반환합니다)
.name -> 반환된 스레드 이름을 나타내줍니다.

이렇게해서 받은 값은 current_thread()에서 받고, name으로 넘겨진다음, '난스레드'라는 이름으로 쓰이게 됩니다.
import threading

def thread_name():
    print(f"스레드 이름: {threading.current_thread().name}")

# 스레드의 이름을 지정
thread = threading.Thread(target=thread_name, name="난스레드")

thread.start()
thread.join()

print("스래드 종료")

5. daemon: 
데몬 스레드는 간단히 말해, 메인 스레드가 계속 실행되는 동안, 계속 실행되게 하는 스레드입니다. 즉, 메인 스레드가 멈추지 않으면 데몬 스레드도 멈추지 않습니다. 
메인 스레드의 카운터를 5초로 넣고,
daemon_thread.daemon = True를 하면 daemon_thread 스레드는 데몬으로 설정이 되면서,
메인 스레드가 5초 동안 동작할 때까지 멈추지 않습니다. 그렇기에
데몬 스레드 작동중1
데몬 스레드 작동중2
데몬 스레드 작동중3
데몬 스레드 작동중4
데몬 스레드 작동중5
메인 스레드가 종료됩니다.
라며 스레드가 종료됩니다.
import threading
import time

def daemon():
    while True:
        for i in range(1,6):
            print(f"데몬 스레드 작동중{i}")
            time.sleep(1)

# 데몬 스레드 생성
daemon_thread = threading.Thread(target=daemon)
daemon_thread.daemon = True  # 이 스레드를 데몬으로 설정

daemon_thread.start()  # 데몬 스레드 시작

# 메인 스레드에서 5초 동안 대기
time.sleep(5)
print("메인 스레드가 종료됩니다.")

스레드의 부모와 자식
아까 .join()은 약간 어려운 개념이라고 했는데, 그 이유가 여기 있습니다.
스레드에는 부모와 자식이 있습니다. 그리고 .join()은 그 자식 스레드가 끝날때 까지 '기다려주는' 역할을 합니다. 
예시를 통해 확인하겠습니다.
import threading

def child(*args, **kwargs):
    print(f'child 스레드 시작: 나이 : {args[0]}, 이름 = {kwargs['이름']}')

if __name__ == "__main__":
    print('부모 스레드 시작')
    parent = threading.Thread(target = child, args = (5,), kwargs = {'이름' : 'kevin'})
    parent.start()
    parent.join()
    print("부모 스레드 끝")
자 여기서 부모 스레드를 구별하는 방법은 바로 if __name__ == "__main__"입니다. 
지금은 편하게 부모를 알아보게 하는 코드라고 생각합시다. (참고로 if에 속한 저 코드는 고정이며, 수정될 수 없습니다)
그렇게 .join()을 사용하면 위에 자식 스레드가 실행이 종료될때까지 기다리는 역할을 합니다.

자...그러면

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

프로세스(Multiprocessing)
2. 멀티프로세싱(multiprocessing)
멀티프로세싱도 스레드와 비슷하게 작동합니다. 즉, 여러 개의 프로세스를 동시에 실행하여 작업을 병렬로 처리하는 기법입니다. 하지만. 중요한 부분이 있습니다. (이 개념은 이해가 매우 중요!)

스레드 -> 하나의 메모리 공간을 여러 스레드가 공유하면서 실행됩니다. 그렇기 때문에 입출력(I/O) 작업에 매우 효율이 좋습니다(예를 들어 파일 읽기/쓰기, 네트워크 요청 처리 같은 경우 여러 스레드로 나누어 처리하면 효율적입니다) 하지만 CPU를 많이 사용하는 작업에서는 성능 향상이 제한적입니다.
프로세스 -> CPU 코어를 병렬로 활용할 수 있어서, CPU 집약적인 작업에 적합합니다. 즉, 수학적 계산, 데이터 분석, 이미지 처리 같은 작업에서 성능 향상을 얻을 수 있습니다.

스레드 -> GIL(Global Interpreter Lock)이란 개념이 파이썬에 있어서 실질적으로 어쩔 수 없이 스레드는 한 번에 하나의 스레드만 실행이 됩니다.
프로세스 -> GIL(Global Interpreter Lock)의 제약이 없기 때문에 진정한 병렬 처리가 가능합니다.

추가로, 프로세스에서 쓸 수 있는 메서드가 2개 더 있습니다:

 

프로세스의 구조는 스레딩과 매우 흡사하며, args, kwargs, name등등 여러 매개변수도 똑같이 받을 수 있습니다.
하지만 프로세스은 스레드에서 부모를 알리는 if __name__ == "__main__":이 항상 붙습니다. 그 이유로는,

 

프로세스 무한생성 방지
-> main코드가 계속 실행이 되면서 무한 루프처럼 계속 될 수 있는데, 이것을 방지하고자 쓰입니다.
import multiprocessing

def 작업함수(이름):
    print(f"{이름} 작업 시작")

if __name__ == "__main__":
    # 프로세스 생성
    프로세스1 = multiprocessing.Process(target=작업함수, args=("프로세스1",))

    # 프로세스 시작
    프로세스1.start()

    # 프로세스가 끝날 때까지 대기
    프로세스1.join()

    print("모든 작업 완료")

프로세스의 기본 구조입니다. 스레드와 매우 비슷하지만 코드가 복잡해지면 질수록, 달라집니다.


멀티프로세싱의 클래스들
1. process 클래스
간단한 thread와 비슷하다고 생각하시면 됩니다. 기본 구조와 비슷한 형태를 띕니다.
참고로 process 클래스에서 쓸 수 있는 함수가 2개 더 있습니다.
is_alive(): 프로세스가 아직 실행 중인지 확인합니다.
terminate(): 프로세스를 강제로 종료합니다.

 

from multiprocessing import Process
def working():
    print('working')

if __name__ == '__main__':
    processing = Process(target = working)
    processing.start()
    processing.join()

2. Pool 클래스
Pool 클래스는 약간 어려울 수 있습니다. Pool은 with as 구문을 통해 실행시킬 수 있는 데이터를 정할 수 있습니다.

받은 인자를 제곱을 하는 함수를 만들었습니다. 그리고 with Pool(6) as p:가 있는데, 저기 저 6은 바로 한번에 실행시킬 코드의 갯수를 말합니다.

밑에 map함수를 사용해 각 리스트에 있는 값들을 전부 제곱이라는 함수에 적용을 시키는데, 그 6개를 전부 한번에 돌리라는 뜻이 되겠습니다.

하지만!!!! 사실 저기 Pool()에서 4를 넣어도 작동은 됩니다. 
프로세스는 6개인데, 4개를 어떻게 돌리냐면, 먼저 숫자 1,2,3,4의 프로세스를 한번에 돌린 다음, 다시 나머지 2개를 한번에 돌립니다. 즉, 먼저 4개를 돌리고 2개를 돌려 총 2번을 돌린다는 뜻입니다(여기에는 다음에 배울 Queue 클래스와 관련이 있습니다!).

참고로 Pool 클래스에서 apply()함수를 쓸 수 있습니다!
from multiprocessing import Pool

def 제곱(a):
    return a * a

if __name__ == "__main__":
    with Pool(6) as p:
        결과 = p.map(제곱, [1, 2, 3, 4, 5, 6])
    print(결과)

3. Queue 클래스
Queue 클래스는 프로세스 간에 데이터를 주고받을 때 사용하는 FIFO(선입선출) 큐입니다.

Queue 클래스에서 쓰이는 함수는 3가지가 있습니다
put(item): 큐에 데이터를 넣습니다.
get(): 큐에서 데이터를 꺼냅니다.
empty(): 큐가 비어 있는지 확인합니다.

방금 Pool 클래스에서 Queue 클래스와 관련이 있다고 했는데, 그 이유는 바로 Queue는 멀티프로세싱간에 자동으로 관섭을 주어서 Pool 클래스에서 쓰이는 데이터가 비었는지 확인하고, 자동으로 다시 실행시켜 주는 역할을 하기 때문입니다.

즉, Queue 클래스를 쓰지 않았어도 Pool 함수에서 자동으로 4개의 프로세스를 끝내고 Queue가 데이터 2개를 남은것을 확인, 및 Pool 클래스를 다시 가동시킨 것이라 생각하면 됩니다!

여기서 .put은 특정 클래스만 사용할 수 있는(예: Queue) 메서드로, 값을 추가한다는 뜻을 가지고 있습니다. 클래스의 append라 생각하면 됩니다.

밑에 코드를 따라가면, Queue 클래스를 담은 큐가 그대로 args에 전달되면서, 그 전달 값이 작업 함수에 들어가면서,
Queue.put('작업 완료') 코드가 완성됩니다. 그러면 '작업 완료' 텍스트가 Queue로 들어가면서, 자연스럽게 '작업 완료'라고 출력이 됩니다.

 

from multiprocessing import Process, Queue

def 작업(queue):
    queue.put("작업 완료")

if __name__ == "__main__":
    큐 = Queue()
    프로세스 = Process(target=작업, args=(큐,))
    프로세스.start()
    프로세스.join()

    print(큐.get())  # 큐에서 데이터를 가져옵니다
근데 솔직히 이번 Queue의 예시는 Queue 자체를 설명하지 않는 것 같아 새롭게 가져왔습니다.
from multiprocessing import Process, Queue
import time
import random

# 생산자 함수
def 생산자(큐):
    for i in range(1, 5):  # 5개의 아이템 생산
        아이템 = f"아이템 {i}"
        print(f"생산자: {아이템} 생성")
        큐.put(아이템)  # 큐에 아이템 추가
        time.sleep(random.random())  # 랜덤 시간 대기

# 소비자 함수
def 소비자(큐):
    while True:
        try:
            아이템 = 큐.get(timeout=1)  # 큐에서 아이템 가져오기
            print(f"소비자: {아이템} 소비")
        except Exception:
            break  # 큐가 비어있으면 종료

if __name__ == "__main__":
    큐 = Queue()  # 큐 생성

    # 생산자 프로세스 생성
    생산자_프로세스 = Process(target=생산자, args=(큐,))
    # 소비자 프로세스 생성
    소비자_프로세스 = Process(target=소비자, args=(큐,))

    생산자_프로세스.start()  # 생산자 프로세스 시작
    소비자_프로세스.start()  # 소비자 프로세스 시작

    생산자_프로세스.join()  # 생산자 프로세스가 끝날 때까지 기다림
    소비자_프로세스.join()  # 소비자 프로세스가 끝날 때까지 기다림

 

자, 여기서는 조금 복잡해졌지만, Queue 클래스의 사용법을 바로 익히실 수 있습니다.
먼저 생산자란 함수를 만들고, 큐라는 매개변수를 만듭니다.
그리고 안에는 for문을 써서 range (1,5) 즉, 1부터 4까지 숫자를 받고, 그 숫자 그대로 아이템{i}를 출력하게 만듭니다. 그리고 그 아이템{i}를 다시 큐라는 매개변수에 넣어줍니다.

다시 print를 써서 아이템에 맞게 출력하는 코드를 만들고 time.sleep를 써서 한 문장 띄울때마다 대기 시간을 넣어줍니다(이건 자유)
자, 여기서 이제 Queue 클래스의 역할 확인할 수 있습니다. 바로 아이탬{i}가 들어있는 매개변수 그대로 
FIFO(선입선출)하여 바로 다음 생산자 클래스에 넣어주는 겁니다.

여기서 주의할 점은 바로 '선입선출'이라는 개념입니다. 원래라면 생산자를 다 돌리고 난 후, 소비자가 돌아가야 하는데, 아이템1이라는 값이 만들어 지자마자, 바로 소비자 함수로 들어가는 것입니다. 선입(아이템1) 선출(아이템1) 한다는 뜻입니다.

그렇게하면,
생산자: 아이템 1 생성
생산자: 아이템 2 생성
생산자: 아이템 3 생성
생산자: 아이템 4 생성
소비자: 아이템 1 소비
소비자: 아이템 2 소비
소비자: 아이템 3 소비
소비자: 아이템 4 소비
가 아니라,
생산자: 아이템 1 생성
소비자: 아이템 1 소비
생산자: 아이템 2 생성
소비자: 아이템 2 소비
생산자: 아이템 3 생성
소비자: 아이템 3 소비
생산자: 아이템 4 생성
소비자: 아이템 4 소비
이런식으로 번갈아가면서 값을 출력하는 겁니다.
주의할 점은 소비자는 언제나 값을 받을 준비가 되어있어야 하기 때문에, while True를 써서 무한적으로 실행시키도록 되어있습니다.
이제 감이 좀 오시나요?

아쉽게도 Pipe 클래스, Lock 클래스, Value 클래스, Array 클래는 내일 계속 이어가도록 하겠습니다.......(시간 문제!)