[Rust] Bài tóm Producer/Consumer và cách tạo một queue an toàn (không phải not async)
Bài toán producer và consumer là một bài toán quen thuộc và thường được sử dụng trong nhiều chương trình. Trong Rust khi nghĩ đến điều này khái niệm channel trong thư viện tokio và sử dụng tokio::mspc để thực hiện như một channel queue. Producer sử dụng hàm send để gửi một giá trị vào channel này và consumer dùng hàm recv để lấy ra.
Nhưng bây giờ, giả sử vì lý do gì đó chúng ta không thể sử dụng mspc này và phải tự viết một đoạn mã với cơ chế giống như thế. Nghĩa là chúng ta sẽ phải đẩy dữ liệu vào queue từ producer và lấy ra bởi consumer. Liệu cách làm nên là như nào ?
Bạn sẽ nghĩ rằng việc này có gì đâu ? Cứ đẩy vào rồi lấy rồi check choắc kích thước queue thôi mà ? Chúng ta sẽ cùng phân tích
Đây là một trong năm bài bài toán mình làm khi đầu vào Elcom ngày xưa nhưng viết trên C++.Khi học sang Rust, mình đoán nó sẽ cơ chế tương tự.
Phân tích mpsc
Nó được gọi là multiple producer single consumer nghĩa là nhiều producer và chỉ một consumer. Đoạn mã thông thường của mpsc
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
- Tạo một kênh nhờ hàm
mpsc::channel() - Producer gọi hàm
tx.send()để gửi dữ liệu vàochannel - Consumer gọi hàm
rx.recv()để lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trong channel, hàm này sẽ blocking. Nhưng làm sao nó có thểblocknhư vậy được? Phải chăng một vòngloopvô hạn trong đó để kiểm traqueuecó dữ liệu không?
Mô tả bài toán?
- Kích thước lớn nhất của
queuelà1000messages. Nghĩa là khi queue đang chứa1000messages, chúng ta sẽ không thể đẩy thêm một message nào được nữa và phải chờ cho consumer tiêu thụ nó. Khi queue là999, chúng ta lại có thể đẩy dữ liệu vàoqueueđược. - Producer gọi hàm
enqueueđể gửi dữ liệu vào channel - Consumer gọi hàm
dequeueđể lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trongqueue, hàm này sẽ blocking và chờ cho đến khi có dữ liệu đẩy vào. Trong trường hợp ngược lại, khi queue đãđầychúng ta phải chờ để consumer tiêu thụ mới có thể đẩy vào tiếp - Chương trình là sync (không sử dụng async), không sử dụng
mspcvàruntime tokiohoặc runtime nào khác. Queuephải an toàn và không có tranh chấp.- Có thể nhiều
producerscùng đẩy vào và nhiềuconsumerscùng lấy ra đồng thời.
Hướng tiếp cận 1
Việc đẩy dữ liệu vào một channel có thể ko phải là vấn đề, nhưng việc lấy dữ liệu ra không phải lúc nào cũng là điều khiển thành công. Ý tưởng đầu tiên để làm là thực hiện một vòng lặp trong Consumer để kiểm tra len.
Hướng đầu tiên này chúng ta sử dụng 2 tasks. Producer task sẽ tìm cách đẩy dữ liệu vào một VecDeque<u32>. Ở đầu kia Consumer task sẽ là một vòng lặp vô hạn để lấy dữ liệu ra.
Nó sẽ kiểm tra VecDeque<u32> có dữ liệu hay không và tiến hành lấy ra. Trước khi kiểm tra, nó nên phải acquire (lấy được ) một lock để tránh xung đột.
- Khởi tạo một
native producer. Nó nên chứa mộtqueueđược chia sẻ chung ở tất cả cácproducerscũng nhưconsumers
struct NativeProducer {
idx: u64,
queue: Arc<Mutex<VecDeque<u64>>>,
}
- Tạo một
native consumer
struct NativeConsumer {
idx: u64,
queue: Arc<Mutex<VecDeque<u64>>>,
}
- Tạo hàm enqueue trong
producer
const QUEUE_MAX_SIZE = 1000;
impl NativeProducer {
fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
Self {
queue: queue,
idx: idx,
}
}
pub fn enqueue(&mut self, new_item: u64) {
let mut guard = self.queue.lock().unwrap();
if guard.len() < QUEUE_MAX_SIZE {
guard.push_back(new_item)
}
}
}
Hàm enqueue sẽ gọi hàm lock() và sau khi chiếm được một lock nó sẽ có thể gọi hàm để đẩy một số u64 vào queue. Trước đó chúng ta
cần kiểm tra điều kiện độ dài của queue đã đạt giá trị max phần tử hay chưa
- Tạo hàm
dequeuechoconsumer
impl NativeConsumer {
fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
Self {
queue: queue,
idx: idx,
}
}
fn dequeue(&self) -> Option<u64> {
let mut guard = self.queue.lock().unwrap();
match guard.len() {
0 => None,
_ => guard.pop_front(),
}
}
}
- Sử dụng nó
pub async fn run_native_producer_consumer() {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let mut tasks = vec![];
for i in 0..10 {
let consumer = NativeConsumer::new(queue.clone(), i);
let c = std::thread::spawn(move || {
loop {
let item = consumer.dequeue();
if let Some(value) = item {
println!("Consumer {i} consume value:{}", value);
}
// println!("Nothing");
}
});
tasks.push(c);
}
for i in 0..10 {
let mut p = NativeProducer::new(queue.clone(), i);
let t = std::thread::spawn(move || loop {
let item = rand::random::<u64>();
println!("Producer {i} produce value:{}", item);
p.enqueue(item);
std::thread::sleep(time::Duration::from_millis(2000));
});
tasks.push(t);
}
for t in tasks {
t.join().unwrap();
}
}
Kết quả chương trinh
Consumer 3 consume value:2505253166158001706
Producer 2 produce value:2962396143706016556
Consumer 1 consume value:2962396143706016556
Producer 9 produce value:9214376316464147140
Consumer 0 consume value:9214376316464147140
Producer 1 produce value:18436924389842035605
Consumer 2 consume value:18436924389842035605
Producer 7 produce value:16348692822225013178
Consumer 0 consume value:16348692822225013178
Producer 3 produce value:2950566369127932473
Consumer 8 consume value:2950566369127932473
Producer 4 produce value:11596635837023427331
Consumer 9 consume value:11596635837023427331
Producer 0 produce value:8424785294514102202
Consumer 8 consume value:8424785294514102202
Producer 5 produce value:8509136791119345892
Producer 6 produce value:15839379854569099
Consumer 0 consume value:15839379854569099
...
Phân tích
consumerbị động hơn so vớiproducerbởi chúng sẽ không biết lúc nào phải lấy dữ liệu. Trong khi điều khiển đẩy dữ liệu vào khá chủ động và lúc nào có thể thành công.- Điểm yếu của hướng tiếp cận này là vòng lặp vô hạn trong
consumersẽ ảnh hưởng đếnCPUrất lớn. Khi không có gì trongqueuechúng ta vẫn đang phải tốn rất nhiều công để phải đi kiểm tra xem trongqueuecó gì không để mà đi lấy. Nếu bán thử thêm dòngprintln!("None");vào cuối cuả vònglooptrongconsumerbạn sẽ nhận thấy nó in ra liên tục vào điều này. - Một cách
workaroundđể giảm CPU là bạn chỉ cần cho cho sleepstd::thread::sleep(time::Duration::from_millis(100)); cũng sẽ giảmCPUkhá nhiều. Nhưng tự nhiên cách làm này có lẽ không phải là cách làm đúng.
Hướng tiếp cận 2: Khuyến cáo
Bạn sẽ có ý tưởng như sau:
- Có cách nào khi mà
producerđẩy thêm 1 dữ liệu vàoqueuechúng nên báo choconsumerbiết và nó nên lấy ra ngay. Hoặc khi màqueueđãđầyproducercũng nên được nghỉ ngơi và khi có mộtconsumernào đó tiêu thụ message, nó nên được báo từconsumerđể tiếp tục đẩy vàoqueue - Có cách nào để
đợimà không tốnCPUkhông.
Sử dụng Condvar trong thư viện std::sync của Rust
Đây là giới thiệu về nó
Condition variables represent the ability to block a thread such that it consumes no CPU time while waiting for an event to occur. Condition variables are typically associated with a boolean predicate (a condition) and a mutex. The predicate is always verified inside of the mutex before determining that a thread must block.
Ví dụ
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
Đặc tích
-
Nó sử dụng hàm
waitđể đợi. Nó luôn liên kết với một biến điều kiện boolean và mộtMutex.Chú ý luôn luôn nhé. nguyên tắc này mình học từC++. -
Điều kiện này sẽ được
wake uptrong trường hợp có một nơi nào đó gọinotify_one()hoặcnotify_all()
Áp dụng
- Tạo một
SafeQueue
struct SafeQueue {
items: Mutex<VecDeque<u32>>,
not_full: Condvar,
not_empty: Condvar,
}
Chúng ta khởi tạo 2 biến not_full và not_empty. not_full nghĩa là khi. một consumer vừa consume xong thì chắc chắn trong queue sẽ là not_full và tương tự với not_empty
- Tạo một hàm
enqueue
fn enqueue(queue: Arc<SafeQueue>, thread_id: u64) {
let mut guard = queue.items.lock().unwrap();
while guard.len() == MAX {
// Sau khi chiếm được quay lại kiểm tra queue thấy đầy thì phải chờ
// Đợi cho queue khoong còn bị đầy nữa
// Hàm này sẽ phải truyền vào một mutex, khi hàm này block nó sẽ tự động nhả lock
guard = queue.not_full.wait(guard).unwrap();
}
let num = rand::thread_rng().gen_range(0..100);
println!("Producer {thread_id} push: {num}");
guard.push_back(num);
queue.not_empty.notify_all();
}
Hàm này nhận vào 2 tham số là queue và thread_id. Nó cũng sẽ gọi hàm lock() trước khi vào phần critical section.
Đoạn logic quan trọng ở đây là cần kiểm queue có bị đầy hay không. Nếu như đang full chúng ta sẽ đợi cho nó not full.
Bằng cách sử dụng guard = queue.not_full.wait(guard).unwrap();. Bạn sẽ thắc mắc là nếu việt như này thì đoạn nó nhả lock ra ở đâu. Condvar có cơ chế để nó nhả lock ra khi điều kiện chưa thỏa mãn. Trong trường hợp đã nhận điều kiên not_full và kiểm tra lại một lần nữa là queue là không đầy chúng sẽ được quyền đẩy message vào queue thông qua guard.push(num). Sau khi đẩy vào, chúng ta nên báo cho consumer là hãy tiêu thụ đi. notify_all() để báo cho tất cả consumer thay vì chỉ một.
- Tạo một hàm
dequeue
fn dequeue(queue: Arc<SafeQueue>, thread_id: u64) {
loop {
let mut guard = queue.items.lock().unwrap();
while guard.len() == 0_usize {
// Chúng ta đang không có gì trong queue, nên phải chờ
//
guard = queue.not_empty.wait(guard).unwrap();
}
let value = guard.pop_front().unwrap();
println!("Consumer {thread_id} consumes value:{value}");
queue.not_full.notify_all();
}
}
Cơ chế cũng tương tự như enqueue. Chúng ta lại đợi điều kiện not_empty và khi nhận được thông tin rằng queue là có một message được đẩy vào rồi và không còn rỗng nữa, chúng ta sẽ gọi hàm pop để lấy ra. Sau khi lấy ra thì queue sẽ chắc chắn không đầy nữa nên chúng ta có thể notify là not_full
- Sử dụng
pub fn run_with_cond_var() {
let queue = Arc::new(SafeQueue::new());
let mut tasks = vec![];
for i in 0..4 {
let queue2 = queue.clone();
let c = thread::spawn(move || dequeue(queue2, i));
tasks.push(c);
}
for i in 0..20 {
let queue1 = queue.clone();
let p = thread::spawn(move || enqueue(queue1, i));
tasks.push(p);
}
for t in tasks {
t.join().unwrap()
}
}
Kết quả
Producer 0 push: 43
Consumer 0 consumes value:43
Producer 1 push: 41
Consumer 0 consumes value:41
Producer 2 push: 95
Consumer 0 consumes value:95
Producer 3 push: 89
Consumer 0 consumes value:89
Producer 5 push: 29
Consumer 1 consumes value:29
Producer 6 push: 44
Consumer 1 consumes value:44
Producer 10 push: 67
Consumer 1 consumes value:67
Producer 8 push: 18
Consumer 1 consumes value:18
Producer 9 push: 22
Consumer 1 consumes value:22
Producer 7 push: 65
Consumer 1 consumes value:65
Producer 11 push: 0
Producer 16 push: 23
Producer 13 push: 74
Producer 18 push: 4
Producer 14 push: 61
Consumer 2 consumes value:0
Consumer 2 consumes value:23
Consumer 0 consumes value:74
Producer 12 push: 43
Consumer 1 consumes value:4
Consumer 1 consumes value:61
Consumer 1 consumes value:43
Producer 17 push: 57
Consumer 1 consumes value:57
Producer 19 push: 41
Consumer 1 consumes value:41
Producer 15 push: 78
Consumer 1 consumes value:78
Producer 4 push: 92
Consumer 1 consumes value:92
Tổng kết
- Sử dụng
condvarđể có thể lắng nghe theo một điều kiện nào đó condvarluôn luônliên kết với mộtmutex- Gọi hàm
waitđể đợi vào hàmnotify_one()hoặcnotify_all()để làm điều kiệnwalke_up - Trong khi đợi nó không
consume CPU