[Rust] Bài tóm Producer/Consumer và cách tạo một queue an toàn (không phải not async)
Bài toán producer
và consumer
là một bài toán quen thuộc và thường được sử dụng trong nhiều chương trình. Trong Rust khi nghĩ đến điều này khái niệm channel trong thư viện tokio
và sử dụng tokio::mspc
để thực hiện như một channel
queue
. Producer
sử dụng hàm send
để gửi một giá trị vào channel
này và consumer dùng hàm recv
để lấy ra.
Nhưng bây giờ, giả sử vì lý do gì đó chúng ta không thể sử dụng mspc
này và phải tự viết một đoạn mã với cơ chế giống như thế. Nghĩa là chúng ta sẽ phải đẩy dữ liệu vào queue từ producer
và lấy ra bởi consumer
. Liệu cách làm nên là như nào ?
Bạn sẽ nghĩ rằng việc này có gì đâu ? Cứ đẩy vào rồi lấy rồi check choắc kích thước queue thôi mà ? Chúng ta sẽ cùng phân tích
Đây là một trong năm bài bài toán mình làm khi đầu vào Elcom
ngày xưa nhưng viết trên C++.Khi học sang Rust, mình đoán nó sẽ cơ chế tương tự.
Phân tích mpsc
Nó được gọi là multiple producer single consumer
nghĩa là nhiều producer và chỉ một consumer. Đoạn mã thông thường của mpsc
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
- Tạo một kênh nhờ hàm
mpsc::channel()
- Producer gọi hàm
tx.send()
để gửi dữ liệu vàochannel
- Consumer gọi hàm
rx.recv()
để lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trong channel, hàm này sẽ blocking. Nhưng làm sao nó có thểblock
như vậy được? Phải chăng một vòngloop
vô hạn trong đó để kiểm traqueue
có dữ liệu không?
Mô tả bài toán?
- Kích thước lớn nhất của
queue
là1000
messages. Nghĩa là khi queue đang chứa1000
messages, chúng ta sẽ không thể đẩy thêm một message nào được nữa và phải chờ cho consumer tiêu thụ nó. Khi queue là999
, chúng ta lại có thể đẩy dữ liệu vàoqueue
được. - Producer gọi hàm
enqueue
để gửi dữ liệu vào channel - Consumer gọi hàm
dequeue
để lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trongqueue
, hàm này sẽ blocking và chờ cho đến khi có dữ liệu đẩy vào. Trong trường hợp ngược lại, khi queue đãđầy
chúng ta phải chờ để consumer tiêu thụ mới có thể đẩy vào tiếp - Chương trình là sync (không sử dụng async), không sử dụng
mspc
vàruntime tokio
hoặc runtime nào khác. Queue
phải an toàn và không có tranh chấp.- Có thể nhiều
producers
cùng đẩy vào và nhiềuconsumers
cùng lấy ra đồng thời.
Hướng tiếp cận 1
Việc đẩy dữ liệu vào một channel
có thể ko phải là vấn đề, nhưng việc lấy dữ liệu ra không phải lúc nào cũng là điều khiển thành công. Ý tưởng đầu tiên để làm là thực hiện một vòng lặp trong Consumer
để kiểm tra len
.
Hướng đầu tiên này chúng ta sử dụng 2 tasks
. Producer task
sẽ tìm cách đẩy dữ liệu vào một VecDeque<u32>
. Ở đầu kia Consumer
task sẽ là một vòng lặp vô hạn để lấy dữ liệu ra.
Nó sẽ kiểm tra VecDeque<u32>
có dữ liệu hay không và tiến hành lấy ra. Trước khi kiểm tra, nó nên phải acquire
(lấy được ) một lock
để tránh xung đột.
- Khởi tạo một
native producer
. Nó nên chứa mộtqueue
được chia sẻ chung ở tất cả cácproducers
cũng nhưconsumers
struct NativeProducer {
idx: u64,
queue: Arc<Mutex<VecDeque<u64>>>,
}
- Tạo một
native consumer
struct NativeConsumer {
idx: u64,
queue: Arc<Mutex<VecDeque<u64>>>,
}
- Tạo hàm enqueue trong
producer
const QUEUE_MAX_SIZE = 1000;
impl NativeProducer {
fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
Self {
queue: queue,
idx: idx,
}
}
pub fn enqueue(&mut self, new_item: u64) {
let mut guard = self.queue.lock().unwrap();
if guard.len() < QUEUE_MAX_SIZE {
guard.push_back(new_item)
}
}
}
Hàm enqueue
sẽ gọi hàm lock()
và sau khi chiếm được một lock
nó sẽ có thể gọi hàm để đẩy một số u64
vào queue
. Trước đó chúng ta
cần kiểm tra điều kiện độ dài của queue
đã đạt giá trị max
phần tử hay chưa
- Tạo hàm
dequeue
choconsumer
impl NativeConsumer {
fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
Self {
queue: queue,
idx: idx,
}
}
fn dequeue(&self) -> Option<u64> {
let mut guard = self.queue.lock().unwrap();
match guard.len() {
0 => None,
_ => guard.pop_front(),
}
}
}
- Sử dụng nó
pub async fn run_native_producer_consumer() {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let mut tasks = vec![];
for i in 0..10 {
let consumer = NativeConsumer::new(queue.clone(), i);
let c = std::thread::spawn(move || {
loop {
let item = consumer.dequeue();
if let Some(value) = item {
println!("Consumer {i} consume value:{}", value);
}
// println!("Nothing");
}
});
tasks.push(c);
}
for i in 0..10 {
let mut p = NativeProducer::new(queue.clone(), i);
let t = std::thread::spawn(move || loop {
let item = rand::random::<u64>();
println!("Producer {i} produce value:{}", item);
p.enqueue(item);
std::thread::sleep(time::Duration::from_millis(2000));
});
tasks.push(t);
}
for t in tasks {
t.join().unwrap();
}
}
Kết quả chương trinh
Consumer 3 consume value:2505253166158001706
Producer 2 produce value:2962396143706016556
Consumer 1 consume value:2962396143706016556
Producer 9 produce value:9214376316464147140
Consumer 0 consume value:9214376316464147140
Producer 1 produce value:18436924389842035605
Consumer 2 consume value:18436924389842035605
Producer 7 produce value:16348692822225013178
Consumer 0 consume value:16348692822225013178
Producer 3 produce value:2950566369127932473
Consumer 8 consume value:2950566369127932473
Producer 4 produce value:11596635837023427331
Consumer 9 consume value:11596635837023427331
Producer 0 produce value:8424785294514102202
Consumer 8 consume value:8424785294514102202
Producer 5 produce value:8509136791119345892
Producer 6 produce value:15839379854569099
Consumer 0 consume value:15839379854569099
...
Phân tích
consumer
bị động hơn so vớiproducer
bởi chúng sẽ không biết lúc nào phải lấy dữ liệu. Trong khi điều khiển đẩy dữ liệu vào khá chủ động và lúc nào có thể thành công.- Điểm yếu của hướng tiếp cận này là vòng lặp vô hạn trong
consumer
sẽ ảnh hưởng đếnCPU
rất lớn. Khi không có gì trongqueue
chúng ta vẫn đang phải tốn rất nhiều công để phải đi kiểm tra xem trongqueue
có gì không để mà đi lấy. Nếu bán thử thêm dòngprintln!("None");
vào cuối cuả vòngloop
trongconsumer
bạn sẽ nhận thấy nó in ra liên tục vào điều này. - Một cách
workaround
để giảm CPU là bạn chỉ cần cho cho sleepstd::thread::sleep(time::Duration::from_millis(100))
; cũng sẽ giảmCPU
khá nhiều. Nhưng tự nhiên cách làm này có lẽ không phải là cách làm đúng.
Hướng tiếp cận 2: Khuyến cáo
Bạn sẽ có ý tưởng như sau:
- Có cách nào khi mà
producer
đẩy thêm 1 dữ liệu vàoqueue
chúng nên báo choconsumer
biết và nó nên lấy ra ngay. Hoặc khi màqueue
đãđầy
producer
cũng nên được nghỉ ngơi và khi có mộtconsumer
nào đó tiêu thụ message, nó nên được báo từconsumer
để tiếp tục đẩy vàoqueue
- Có cách nào để
đợi
mà không tốnCPU
không.
Sử dụng Condvar
trong thư viện std::sync
của Rust
Đây là giới thiệu về nó
Condition variables represent the ability to block a thread such that it consumes no CPU time while waiting for an event to occur. Condition variables are typically associated with a boolean predicate (a condition) and a mutex. The predicate is always verified inside of the mutex before determining that a thread must block.
Ví dụ
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
Đặc tích
-
Nó sử dụng hàm
wait
để đợi. Nó luôn liên kết với một biến điều kiện boolean và mộtMutex
.Chú ý luôn luôn nhé. nguyên tắc này mình học từC++
. -
Điều kiện này sẽ được
wake up
trong trường hợp có một nơi nào đó gọinotify_one()
hoặcnotify_all()
Áp dụng
- Tạo một
SafeQueue
struct SafeQueue {
items: Mutex<VecDeque<u32>>,
not_full: Condvar,
not_empty: Condvar,
}
Chúng ta khởi tạo 2 biến not_full
và not_empty
. not_full
nghĩa là khi. một consumer
vừa consume
xong thì chắc chắn trong queue
sẽ là not_full
và tương tự với not_empty
- Tạo một hàm
enqueue
fn enqueue(queue: Arc<SafeQueue>, thread_id: u64) {
let mut guard = queue.items.lock().unwrap();
while guard.len() == MAX {
// Sau khi chiếm được quay lại kiểm tra queue thấy đầy thì phải chờ
// Đợi cho queue khoong còn bị đầy nữa
// Hàm này sẽ phải truyền vào một mutex, khi hàm này block nó sẽ tự động nhả lock
guard = queue.not_full.wait(guard).unwrap();
}
let num = rand::thread_rng().gen_range(0..100);
println!("Producer {thread_id} push: {num}");
guard.push_back(num);
queue.not_empty.notify_all();
}
Hàm này nhận vào 2 tham số là queue
và thread_id
. Nó cũng sẽ gọi hàm lock()
trước khi vào phần critical section
.
Đoạn logic quan trọng ở đây là cần kiểm queue
có bị đầy hay không. Nếu như đang full
chúng ta sẽ đợi cho nó not full
.
Bằng cách sử dụng guard = queue.not_full.wait(guard).unwrap();
. Bạn sẽ thắc mắc là nếu việt như này thì đoạn nó nhả lock ra ở đâu. Condvar
có cơ chế để nó nhả lock
ra khi điều kiện chưa thỏa mãn. Trong trường hợp đã nhận điều kiên not_full
và kiểm tra lại một lần nữa là queue
là không đầy
chúng sẽ được quyền đẩy message vào queue thông qua guard.push(num)
. Sau khi đẩy vào, chúng ta nên báo cho consumer
là hãy tiêu thụ đi. notify_all()
để báo cho tất cả consumer
thay vì chỉ một.
- Tạo một hàm
dequeue
fn dequeue(queue: Arc<SafeQueue>, thread_id: u64) {
loop {
let mut guard = queue.items.lock().unwrap();
while guard.len() == 0_usize {
// Chúng ta đang không có gì trong queue, nên phải chờ
//
guard = queue.not_empty.wait(guard).unwrap();
}
let value = guard.pop_front().unwrap();
println!("Consumer {thread_id} consumes value:{value}");
queue.not_full.notify_all();
}
}
Cơ chế cũng tương tự như enqueue
. Chúng ta lại đợi điều kiện not_empty
và khi nhận được thông tin rằng queue là có một message được đẩy vào rồi và không còn rỗng nữa, chúng ta sẽ gọi hàm pop
để lấy ra. Sau khi lấy ra thì queue
sẽ chắc chắn không đầy nữa nên chúng ta có thể notify
là not_full
- Sử dụng
pub fn run_with_cond_var() {
let queue = Arc::new(SafeQueue::new());
let mut tasks = vec![];
for i in 0..4 {
let queue2 = queue.clone();
let c = thread::spawn(move || dequeue(queue2, i));
tasks.push(c);
}
for i in 0..20 {
let queue1 = queue.clone();
let p = thread::spawn(move || enqueue(queue1, i));
tasks.push(p);
}
for t in tasks {
t.join().unwrap()
}
}
Kết quả
Producer 0 push: 43
Consumer 0 consumes value:43
Producer 1 push: 41
Consumer 0 consumes value:41
Producer 2 push: 95
Consumer 0 consumes value:95
Producer 3 push: 89
Consumer 0 consumes value:89
Producer 5 push: 29
Consumer 1 consumes value:29
Producer 6 push: 44
Consumer 1 consumes value:44
Producer 10 push: 67
Consumer 1 consumes value:67
Producer 8 push: 18
Consumer 1 consumes value:18
Producer 9 push: 22
Consumer 1 consumes value:22
Producer 7 push: 65
Consumer 1 consumes value:65
Producer 11 push: 0
Producer 16 push: 23
Producer 13 push: 74
Producer 18 push: 4
Producer 14 push: 61
Consumer 2 consumes value:0
Consumer 2 consumes value:23
Consumer 0 consumes value:74
Producer 12 push: 43
Consumer 1 consumes value:4
Consumer 1 consumes value:61
Consumer 1 consumes value:43
Producer 17 push: 57
Consumer 1 consumes value:57
Producer 19 push: 41
Consumer 1 consumes value:41
Producer 15 push: 78
Consumer 1 consumes value:78
Producer 4 push: 92
Consumer 1 consumes value:92
Tổng kết
- Sử dụng
condvar
để có thể lắng nghe theo một điều kiện nào đó condvar
luôn luôn
liên kết với mộtmutex
- Gọi hàm
wait
để đợi vào hàmnotify_one()
hoặcnotify_all()
để làm điều kiệnwalke_up
- Trong khi đợi nó không
consume CPU