Today I learned
Key Features of this PR
- 멀티 클라이언트 처리: 여러 클라이언트를 동시에 처리하는 서버 구현
- PING/PONG 응답: 클라이언트의 PING 명령에 대한 PONG 응답 처리
궁금한 부분 정리
- Tokio를 사용하여 Redis 비동기 프로그래밍 및 스레드 관리 →
spawn이 뭐지? - TCP listener, stream 차이
let mut buffer = [0; 1024];- 이 코드는 정확히 무슨 의미인지?
- 어떤 기능을 수행하는 것인지?
- mut 이라 선언한 이유?
주요 개념
- 비동기 프로그래밍:
async와await키워드를 사용하여 비동기 작업을 정의하고 실행 - Tokio 런타임: 비동기 코드를 실행하기 위한 런타임 환경을 제공
- TCP 통신:
TcpListener와TcpStream을 사용하여 TCP 연결을 수신하고 처리 - 멀티태스킹:
tokio::spawn을 사용하여 각 클라이언트 연결을 별도의 비동기 작업으로 처리
1. What is Tokio?
Tokio 개요
- Tokio: Rust 언어를 위한 비동기 런타임
- 주요 구성 요소:
- 멀티스레드 런타임: 비동기 코드를 실행하기 위한 멀티스레드 환경 제공하여 여러 코어를 활용할 수 있음
- 비동기 표준 라이브러리: Rust 표준 라이브러리의 비동기 버전 제공
- 다양한 라이브러리 생태계: 비동기 프로그래밍을 쉽게 하기 위한 다양한 유틸리티 제공
NOTE: I/O 바운드 vs CPU 바운드 작업:
- Tokio는 주로 I/O 바운드 애플리케이션에 적합
- CPU 바운드 작업의 병렬 처리에는 Rayon 라이브러리가 더 적합
2. 비동기 프로그래밍의 기본 개념
- async/await: 비동기 함수 정의 및 호출을 위한 키워드
- Future: 비동기 작업의 결과를 나타내는 타입
- Runtime: 비동기 코드를 실행하는 환경
- 동시성 (Concurrency):
- Tokio를 사용하면 많은 연결을 동시에 처리할 수 있음
- 스케일링 (Scaling):
- 비동기/await 기능을 통해 동시 작업의 수를 늘리는 것이 효율적임
3. Tokio의 주요 구성 요소
- Spawning: 새로운 비동기 작업을 생성하여 실행
- Channels: 비동기 작업 간의 통신을 위한 채널
- I/O: 비동기 입출력 처리
- Framing: 데이터 스트림을 프레임으로 나누어 처리
- Streams: 비동기 데이터 스트림 처리를 지원
- 이벤트 루프 (Event Loop): 명시적으로 언급되지는 않았지만, 비동기 런타임의 핵심 개념
4. Rust와 Tokio를 사용한 비동기 서버
- TcpListener: TCP 연결을 수신하는 리스너
- TcpStream: TCP 연결 스트림
- tokio::spawn: 새로운 비동기 작업을 생성하여 실행
5. TCP listener 와 TCP stream 의 차이
- TcpListener:
- 서버가 시작될 때 특정 포트에 바인딩하여 생성
- 클라이언트의 연결 요청을 수신하고 수락하는 역할
- 특정 IP와 포트에 바인딩되어 클라이언트의 연결을 대기
- TcpStream:
- TcpListener가 클라이언트의 연결을 수락할 때 생성
- 클라이언트와 서버 간의 데이터 통신을 처리하는 역할
- 클라이언트와 서버 간의 데이터를 읽고 쓰는 작업 수행
Code
use std::{
io::{Read, Write},
net::{TcpListener, TcpStream},
};
#[tokio::main]
async fn main() {
println!("Logs from your program will appear here!");
let listener = TcpListener::bind("127.0.0.1:6379").expect("Failed to bind to port 6379");
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().unwrap();
// A new task is spawned for each inbound socket.
// The socket is moved to the new task and processed there.
tokio::spawn(async move { handle_connection_process(socket).await });
}
}
pub async fn handle_connection_process(stream: TcpStream) {
println!("accepted new connection");
let mut tcp_stream = stream;
let response = "+PONG\r\n";
let mut buffer = [0; 1024];
loop {
match tcp_stream.read(&mut buffer) {
Ok(0) => break,
Ok(_) => {
tcp_stream
.write_all(response.as_bytes())
.expect("Failed to write to stream");
}
Err(e) => {
println!("multiple Ping response error: {}", e);
break;
}
}
}
println!("read from stream: {}", String::from_utf8_lossy(&buffer));
}
코드 설명
Imports:
std::io::{Read, Write}: TCP 스트림에서 읽고 쓰기 위한 표준 라이브러리std::net::{TcpListener, TcpStream}: TCP 연결을 수신하고 처리하기 위한 표준 라이브러리
main 함수:
#[tokio::main]: 이 속성은 main 함수를 비동기 함수로 만듬. Tokio 런타임이 이 함수를 실행TcpListener::bind("127.0.0.1:6379"): 로컬 호스트의 6379 포트에 바인딩하여 TCP 연결을 수신loop: 무한 루프를 통해 계속해서 새로운 연결을 수신함listener.accept(): 새로운 TCP 연결을 수락tokio::spawn(async move { ... }): 새로운 비동기 작업을 생성하여 각 연결 처리
handle_connection_process 함수:
TcpStream을 인자로 받아 클라이언트와의 통신을 처리let response = "+PONG\r\n";: 클라이언트에 보낼 PONG 응답을 정의let mut buffer = [0; 1024];: 데이터를 읽기 위한 버퍼를 정의loop: 무한 루프를 통해 클라이언트로부터 데이터를 읽고 응답을 보냄tcp_stream.read(&mut buffer): 클라이언트로부터 데이터를 읽음tcp_stream.write_all(response.as_bytes()): 클라이언트에 PONG 응답을 보냄Err(e): 읽기 또는 쓰기 오류가 발생하면 로그에 출력하고 루프를 종료
stream mut를 명시적으로 선언한 이유 (24 line)
- TcpStream 객체를 변경할 수 있도록 하기 위함.
- Rust에서는 기본적으로 변수는 불변(immutable) 임. 따라서 변수의 값을 변경하려면 mut 키워드를 사용하여 가변(mutable) 변수로 선언해야 함
- stream을 가변 변수로 선언하지 않으면, read와 write_all 메서드를 호출할 때 컴파일 오류가 발생함
- Rust의 엄격한 불변성 규칙 덕분에 데이터 경합(race condition)과 같은 문제를 방지할 수 있음
NOTE: mut 키워드의 필요성
- 변경 가능한 스트림:
- TcpStream 객체는 클라이언트와의 통신을 위해 데이터를 읽고 쓰는 작업을 수행
- 이러한 작업은 스트림의 내부 상태를 변경할 수 있음
→ 예를 들어, read 메서드는 스트림의 현재 읽기 위치를 변경하고, write_all 메서드는 스트림에 데이터를 작성
- 가변 변수 선언: mut 키워드를 사용하여 stream을 가변 변수로 선언함으로써, 이 스트림 객체를 통해 데이터를 읽고 쓸 수 있다.
stream을 가변 변수로 선언하지 않으면, read와 write_all 메서드를 호출할 때 컴파일 오류가 발생
Rust의 엄격한 불변성 규칙 덕분에 데이터 경합(race condition)과 같은 문제를 방지할 수 있음
Buffer 의 역할?
- 버퍼(Buffer):
- 데이터를 임시로 저장하는 메모리 공간
- 네트워크 통신에서 데이터를 읽거나 쓸 때, 데이터를 일시적으로 저장하는 데 사용됨
버퍼의 활용
- 데이터 읽기: stream.read(&mut buffer): 클라이언트로부터 데이터를 읽어와 buffer에 저장
- 데이터 처리: 읽어온 데이터를 buffer에서 꺼내어 처리합니다. 예를 들어, PING 명령을 확인하고 PONG 응답을 보냄
- 데이터 쓰기: stream.write_all(response.as_bytes()): 처리된 데이터를 클라이언트로 다시 보냄
스레드 관리의 주요 개념
- 스레드(Thread):
- 정의: 프로세스 내에서 실행되는 가장 작은 단위. 하나의 프로세스는 여러 스레드를 가질 수 있다.
- 특징: 같은 프로세스 내의 스레드는 메모리와 자원을 공유함.
- 프로세스와 스레드는 메모리 공유 영역에서 보자면 스레드는 각자 스택 영역을 따로 갖고 있지만, 코드 영역과 힙 영역은 공유한다.
- 프로세스는 모두 따로 가지고 있음
- 이런 차이점이 있어서 다른 프로세스 간 context switching time은 스레드끼리의 context switching time 보다 더 걸림.
스레드에서 스택 영역을 제외한 메모리를 공유한다는 특성 때문에, 동시성 이슈를 주의해야함.
예를 들어 1번 스레드가 malloc한 메모리 영역을 2번 스레드가 비정상적으로 read 또는 write 할 수 있음.
멀티스레딩(Multithreading):
- 정의: 하나의 프로세스 내에서 여러 스레드를 동시에 실행하는 기술
- 장점: 응답성 향상, 자원 공유, 병렬 처리 가능
- 단점: 동기화 문제, 데드락, 레이스 컨디션 등
스레드 풀(Thread Pool):
- 정의: 미리 생성된 스레드의 집합. 작업이 들어올 때마다 스레드를 할당하여 작업을 처리
- 장점: 스레드 생성/소멸의 오버헤드 감소, 자원 효율적 사용
- 구성 요소: 작업 큐, 스레드 관리기
동기화(Synchronization):
- 정의: 여러 스레드가 공유 자원에 접근할 때 일관성을 유지하기 위한 방법
- 기법: 뮤텍스(Mutex), 세마포어(Semaphore), 모니터(Monitor), 조건 변수(Condition Variable)
데드락(Deadlock):
- 정의: 두 개 이상의 스레드가 서로 자원을 기다리며 무한 대기 상태에 빠지는 현상
- 해결 방법: 자원 할당 순서 지정, 타임아웃 설정, 교착 상태 회피 알고리즘
레이스 컨디션(Race Condition):
- 정의: 두 개 이상의 스레드가 공유 자원에 동시에 접근할 때 발생하는 예기치 않은 결과
- 해결 방법: 적절한 동기화 기법 사용
스레드 관리의 중요한 부분
스레드 생성과 소멸:
- 스레드 생성은 비용이 많이 들기 때문에, 스레드 풀을 사용하여 효율적으로 관리
- 스레드 소멸 시 자원 해제와 정리 작업 필요
작업 분배:
- 작업 큐를 사용하여 스레드에 작업을 분배
- 작업의 우선순위와 스케줄링 고려
동기화와 상호 배제:
- 공유 자원에 대한 접근을 동기화하여 데이터 일관성 유지
- 뮤텍스와 세마포어를 사용하여 상호 배제 구현
- Mutex와 semaphore의 차이
Mutex는 스레드 간 단 하나의 스레드가 특정 메모리 영역을 소유해 read/write를 해야 할 경우에 사용됨semaphore는 특정 개수의 스레드가 메모리에 접근해도 허용됨
semaphore의 count가 1이라고 해도 세부 동작에선 차이가 있는데, semaphore의 경우 다른 스레드에서 접근 요청을 할 경우에도 가능하다는 차이점이 있음
에러 처리:
- 스레드 내에서 발생하는 예외와 오류를 적절히 처리
- 스레드 풀에서 스레드가 비정상 종료될 경우 재생성
성능 최적화:
- 스레드 수와 작업 큐의 크기를 조절하여 성능 최적화
- 병목 현상과 과도한 컨텍스트 스위칭 방지
Async/Sync & Blocking/non-Blocking
I/O 바운드 작업(파일 읽기/쓰기, 네트워크 통신 등)에서 중요!
CPU 바운드 작업의 경우 비동기 처리의 이점이 크지 않을 수 있음 (CPU 바운드 작업 개념 관련해서 나중에 찾아 보기)
- Synchronous(동기) vs Asynchronous(비동기)
- Synchronous(동기): 작업이 순차적으로 실행되며, 한 작업이 완료될 때까지 다음 작업이 기다림
- Asynchronous(비동기): 여러 작업이 동시에 실행될 수 있으며, 한 작업의 완료를 기다리지 않고 다른 작업도 동시에 실행됨
- Blocking vs Non-blocking
- Blocking: 호출된 함수가 자신의 작업을 모두 마칠 때까지 제어권을 가지고 있어, 호출한 함수가 대기
- Non-blocking: 호출된 함수가 바로 리턴하여 호출한 함수에게 제어권을 넘겨주고, 호출한 함수가 다른 일을 할 수 있게 함
이 개념들을 조합하면 다음과 같은 4가지 경우가 나올 수 있음:
- Synchronous Blocking (동기 블로킹):
파일을 읽고 그 내용을 화면에 출력하는 프로그램이 대표적인 예시
파일을 다 읽을 때까지 프로그램은 다른 작업을 하지 않고 대기
content = file.read() # 파일을 다 읽을 때까지 기다림
print(content) # 읽기가 완료된 후 출력
- Synchronous Non-blocking (동기 논블로킹):
이 경우는 실제로 흔하지 않음
파일을 읽는 작업을 시작하고 즉시 반환하지만, 계속해서 완료 여부를 확인하는 경우로 보면 됨
while True:
try:
content = file.read(1024) # 일부만 읽고 즉시 반환
if not content:
break
print(content)
except WouldBlockError:
continue # 읽을 수 없으면 다시 시도
- Asynchronous Blocking (비동기 블로킹):
여러 작업을 동시에 시작할 수 있지만, 각 작업이 완료될 때까지 블로킹됨
멀티스레딩 환경에서 볼 수 있음
def read_file(file):
content = file.read() # 이 작업은 블로킹됨
print(content)
thread1 = Thread(target=read_file, args=(file1,))
thread2 = Thread(target=read_file, args=(file2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
- Asynchronous Non-blocking (비동기 논블로킹):
가장 효율적인 방식으로, 여러 작업을 동시에 시작하고 각 작업이 완료될 때 콜백을 통해 결과를 처리함
Node.js나 Python의 asyncio가 이 방식을 사용함
async def read_file(file):
content = await file.read() # 비동기적으로 파일 읽기
print(content)
async def main():
task1 = asyncio.create_task(read_file(file1))
task2 = asyncio.create_task(read_file(file2))
await asyncio.gather(task1, task2)
asyncio.run(main())
Plus: Node.js 는 싱글 스레드인가 멀티 스레드인가?
nodeJS는 비동기(Asynchronous)실행 기반의 특수한 멀티 스레드(Multi-thread)모델
Node.js 내부에서의 비동기 실행 구현 방법 중 한 가지(File I/O 파일 입출력 작업에 관해서)
1) 메인 스레드는 빠르게 처리할 수 있는 작업들을 집중해서 ‘혼자' 처리하고,
2) 파일 읽기와 같이 시간이 오래 걸리는 작업은 다른 스레드에 맡긴다
- 자바스크립트는 단일 스레드 언어로 분류된다
- 자바스크립트는 여러 개의 태스크 큐를 갖고 있다
- 자바스크립트는 한 번에 하나의 태스크만 실행한다
- 자바스크립트는 내부적으로 태스크의 우선 순위를 설정한다
- 자바스크립트는 단일 스레드를 갖고 있어 한 번에 하나의 작업만 실행할 수 있는 언어이고 내부적으로는 3–4 개의(혹은 조금 더 많을 수는 있다) 태스크 큐를 갖고 있다.
프로미스를 제대로 알고 사용하기. 많은 개발자가 프로미스를 왜 사용해야하는지조차 모르고 사용하고 있다 | by Moon | 오늘의 프로그래밍 | Medium
References
write tcp 127.0.0.1:48008->127.0.0.1:6379: write: broken pipehttps://dev.to/mstryoda/golang-what-is-broken-pipe-error-tcp-http-connections-and-pools-4699
rust Arc
Related to #3 issue section
오개념이 있는지 추후 재검토 예정, last updated at 2024/07/26
'재밌는 개발글이지만 아직 미분류' 카테고리의 다른 글
| [2024-06-12] chapter 1.0 ~ 2.4 / keyword: integer type, NO Truthy/Falsy concept in Rust, panic! & Error with ROP (2) | 2025.06.12 |
|---|---|
| [Staging] expiry (4) (0) | 2025.06.12 |
| [Staging] Respond to multiple PINGs (2) (0) | 2025.06.12 |
| [Staging] Bind to port 6379 and respond to "PING" with "PONG" by RESP (1) (2) | 2025.06.12 |
| 주니어 개발자 3년차의 2025 회고 - 불확실함 속에서 찾아가는 나만의 길 (1) | 2025.05.25 |