Bài toán producerconsumer là một bài toán quen thuộc và thường được sử dụng trong nhiều chương trình. Trong Rust khi nghĩ đến điều này khái niệm channel trong thư viện tokio và sử dụng tokio::mspc để thực hiện như một channel queue. Producer sử dụng hàm send để gửi một giá trị vào channel này và consumer dùng hàm recv để lấy ra.

Nhưng bây giờ, giả sử vì lý do gì đó chúng ta không thể sử dụng mspc này và phải tự viết một đoạn mã với cơ chế giống như thế. Nghĩa là chúng ta sẽ phải đẩy dữ liệu vào queue từ producer và lấy ra bởi consumer. Liệu cách làm nên là như nào ?

Bạn sẽ nghĩ rằng việc này có gì đâu ? Cứ đẩy vào rồi lấy rồi check choắc kích thước queue thôi mà ? Chúng ta sẽ cùng phân tích

Đây là một trong năm bài bài toán mình làm khi đầu vào Elcom ngày xưa nhưng viết trên C++.Khi học sang Rust, mình đoán nó sẽ cơ chế tương tự.

Phân tích mpsc

Nó được gọi là multiple producer single consumer nghĩa là nhiều producer và chỉ một consumer. Đoạn mã thông thường của mpsc

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}
  • Tạo một kênh nhờ hàm mpsc::channel()
  • Producer gọi hàm tx.send() để gửi dữ liệu vào channel
  • Consumer gọi hàm rx.recv() để lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trong channel, hàm này sẽ blocking. Nhưng làm sao nó có thể block như vậy được? Phải chăng một vòng loop vô hạn trong đó để kiểm tra queue có dữ liệu không?

Mô tả bài toán?

  • Kích thước lớn nhất của queue1000 messages. Nghĩa là khi queue đang chứa 1000 messages, chúng ta sẽ không thể đẩy thêm một message nào được nữa và phải chờ cho consumer tiêu thụ nó. Khi queue là 999, chúng ta lại có thể đẩy dữ liệu vào queue được.
  • Producer gọi hàm enqueue để gửi dữ liệu vào channel
  • Consumer gọi hàm dequeue để lấy dữ liệu ra. Trong trường hợp không có dữ liệu nào trong queue, hàm này sẽ blocking và chờ cho đến khi có dữ liệu đẩy vào. Trong trường hợp ngược lại, khi queue đã đầy chúng ta phải chờ để consumer tiêu thụ mới có thể đẩy vào tiếp
  • Chương trình là sync (không sử dụng async), không sử dụng mspcruntime tokio hoặc runtime nào khác.
  • Queue phải an toàn và không có tranh chấp.
  • Có thể nhiều producers cùng đẩy vào và nhiều consumers cùng lấy ra đồng thời.

Hướng tiếp cận 1

Việc đẩy dữ liệu vào một channel có thể ko phải là vấn đề, nhưng việc lấy dữ liệu ra không phải lúc nào cũng là điều khiển thành công. Ý tưởng đầu tiên để làm là thực hiện một vòng lặp trong Consumer để kiểm tra len.

Hướng đầu tiên này chúng ta sử dụng 2 tasks. Producer task sẽ tìm cách đẩy dữ liệu vào một VecDeque<u32>. Ở đầu kia Consumer task sẽ là một vòng lặp vô hạn để lấy dữ liệu ra. Nó sẽ kiểm tra VecDeque<u32> có dữ liệu hay không và tiến hành lấy ra. Trước khi kiểm tra, nó nên phải acquire (lấy được ) một lock để tránh xung đột.

  • Khởi tạo một native producer. Nó nên chứa một queue được chia sẻ chung ở tất cả các producers cũng như consumers
struct NativeProducer {
    idx: u64,
    queue: Arc<Mutex<VecDeque<u64>>>,
}
  • Tạo một native consumer
struct NativeConsumer {
    idx: u64,
    queue: Arc<Mutex<VecDeque<u64>>>,
}

  • Tạo hàm enqueue trong producer

const QUEUE_MAX_SIZE  = 1000;
impl NativeProducer {
    fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
        Self {
            queue: queue,
            idx: idx,
        }
    }
    pub fn enqueue(&mut self, new_item: u64) {
        let mut guard = self.queue.lock().unwrap();
        if guard.len() < QUEUE_MAX_SIZE {
            guard.push_back(new_item)
        }
    }
}

Hàm enqueue sẽ gọi hàm lock() và sau khi chiếm được một lock nó sẽ có thể gọi hàm để đẩy một số u64 vào queue. Trước đó chúng ta cần kiểm tra điều kiện độ dài của queue đã đạt giá trị max phần tử hay chưa

  • Tạo hàm dequeue cho consumer
impl NativeConsumer {
    fn new(queue: Arc<Mutex<VecDeque<u64>>>, idx: u64) -> Self {
        Self {
            queue: queue,
            idx: idx,
        }
    }
    fn dequeue(&self) -> Option<u64> {
        let mut guard = self.queue.lock().unwrap();
        match guard.len() {
            0 => None,
            _ => guard.pop_front(),
        }
    }
}
  • Sử dụng nó
pub async fn run_native_producer_consumer() {
     let queue = Arc::new(Mutex::new(VecDeque::new()));
    let mut tasks = vec![];
    for i in 0..10 {
        let consumer = NativeConsumer::new(queue.clone(), i);
        let c = std::thread::spawn(move || {
            loop {
                let item = consumer.dequeue();
                if let Some(value) = item {
                    println!("Consumer {i} consume value:{}", value);
                }
                // println!("Nothing");
            }
        });
        tasks.push(c);
    }
    for i in 0..10 {
        let mut p = NativeProducer::new(queue.clone(), i);
        let t = std::thread::spawn(move || loop {
            let item = rand::random::<u64>();
            println!("Producer {i} produce value:{}", item);
            p.enqueue(item);
            std::thread::sleep(time::Duration::from_millis(2000));
        });
        tasks.push(t);
    }
    for t in tasks {
        t.join().unwrap();
    }
}

Kết quả chương trinh

Consumer 3 consume value:2505253166158001706
Producer 2 produce value:2962396143706016556
Consumer 1 consume value:2962396143706016556
Producer 9 produce value:9214376316464147140
Consumer 0 consume value:9214376316464147140
Producer 1 produce value:18436924389842035605
Consumer 2 consume value:18436924389842035605
Producer 7 produce value:16348692822225013178
Consumer 0 consume value:16348692822225013178
Producer 3 produce value:2950566369127932473
Consumer 8 consume value:2950566369127932473
Producer 4 produce value:11596635837023427331
Consumer 9 consume value:11596635837023427331
Producer 0 produce value:8424785294514102202
Consumer 8 consume value:8424785294514102202
Producer 5 produce value:8509136791119345892
Producer 6 produce value:15839379854569099
Consumer 0 consume value:15839379854569099
...

Phân tích

  • consumer bị động hơn so với producer bởi chúng sẽ không biết lúc nào phải lấy dữ liệu. Trong khi điều khiển đẩy dữ liệu vào khá chủ động và lúc nào có thể thành công.
  • Điểm yếu của hướng tiếp cận này là vòng lặp vô hạn trong consumer sẽ ảnh hưởng đến CPU rất lớn. Khi không có gì trong queue chúng ta vẫn đang phải tốn rất nhiều công để phải đi kiểm tra xem trong queue có gì không để mà đi lấy. Nếu bán thử thêm dòng println!("None"); vào cuối cuả vòng loop trong consumer bạn sẽ nhận thấy nó in ra liên tục vào điều này.
  • Một cách workaround để giảm CPU là bạn chỉ cần cho cho sleep std::thread::sleep(time::Duration::from_millis(100)); cũng sẽ giảm CPU khá nhiều. Nhưng tự nhiên cách làm này có lẽ không phải là cách làm đúng.

Hướng tiếp cận 2: Khuyến cáo

Bạn sẽ có ý tưởng như sau:

  • Có cách nào khi mà producer đẩy thêm 1 dữ liệu vào queue chúng nên báo cho consumer biết và nó nên lấy ra ngay. Hoặc khi mà queue đã đầy producer cũng nên được nghỉ ngơi và khi có một consumer nào đó tiêu thụ message, nó nên được báo từ consumer để tiếp tục đẩy vào queue
  • Có cách nào để đợi mà không tốn CPU không.

Sử dụng Condvar trong thư viện std::sync của Rust

Đây là giới thiệu về nó

Condition variables represent the ability to block a thread such that it consumes no CPU time while waiting for an event to occur. Condition variables are typically associated with a boolean predicate (a condition) and a mutex. The predicate is always verified inside of the mutex before determining that a thread must block.

Ví dụ

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);

// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
    let (lock, cvar) = &*pair2;
    let mut started = lock.lock().unwrap();
    *started = true;
    // We notify the condvar that the value has changed.
    cvar.notify_one();
});

// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
    started = cvar.wait(started).unwrap();
}

Đặc tích

  • Nó sử dụng hàm wait để đợi. Nó luôn liên kết với một biến điều kiện boolean và một Mutex.Chú ý luôn luôn nhé. nguyên tắc này mình học từ C++.

  • Điều kiện này sẽ được wake up trong trường hợp có một nơi nào đó gọi notify_one() hoặc notify_all()

Áp dụng

  • Tạo một SafeQueue
struct SafeQueue {
    items: Mutex<VecDeque<u32>>,
    not_full: Condvar,
    not_empty: Condvar,
}

Chúng ta khởi tạo 2 biến not_fullnot_empty. not_full nghĩa là khi. một consumer vừa consume xong thì chắc chắn trong queue sẽ là not_full và tương tự với not_empty

  • Tạo một hàm enqueue
fn enqueue(queue: Arc<SafeQueue>, thread_id: u64) {
    let mut guard = queue.items.lock().unwrap();
    while guard.len() == MAX {
        // Sau khi chiếm được quay lại kiểm tra queue thấy đầy thì phải chờ
        // Đợi cho queue khoong còn bị đầy nữa
        // Hàm này sẽ phải truyền vào một mutex, khi hàm này block nó sẽ tự động nhả lock
        guard = queue.not_full.wait(guard).unwrap();
    }

    let num = rand::thread_rng().gen_range(0..100);
    println!("Producer {thread_id} push: {num}");
    guard.push_back(num);
    queue.not_empty.notify_all();
}

Hàm này nhận vào 2 tham số là queuethread_id. Nó cũng sẽ gọi hàm lock() trước khi vào phần critical section.

Đoạn logic quan trọng ở đây là cần kiểm queue có bị đầy hay không. Nếu như đang full chúng ta sẽ đợi cho nó not full. Bằng cách sử dụng guard = queue.not_full.wait(guard).unwrap();. Bạn sẽ thắc mắc là nếu việt như này thì đoạn nó nhả lock ra ở đâu. Condvar có cơ chế để nó nhả lock ra khi điều kiện chưa thỏa mãn. Trong trường hợp đã nhận điều kiên not_full và kiểm tra lại một lần nữa là queuekhông đầy chúng sẽ được quyền đẩy message vào queue thông qua guard.push(num). Sau khi đẩy vào, chúng ta nên báo cho consumer là hãy tiêu thụ đi. notify_all() để báo cho tất cả consumer thay vì chỉ một.

  • Tạo một hàm dequeue
fn dequeue(queue: Arc<SafeQueue>, thread_id: u64) {
    loop {
        let mut guard = queue.items.lock().unwrap();
        while guard.len() == 0_usize {
            // Chúng ta đang không có gì trong queue, nên phải chờ
            //
            guard = queue.not_empty.wait(guard).unwrap();
        }

        let value = guard.pop_front().unwrap();
        println!("Consumer {thread_id} consumes value:{value}");
        queue.not_full.notify_all();
    }
}

Cơ chế cũng tương tự như enqueue. Chúng ta lại đợi điều kiện not_empty và khi nhận được thông tin rằng queue là có một message được đẩy vào rồi và không còn rỗng nữa, chúng ta sẽ gọi hàm pop để lấy ra. Sau khi lấy ra thì queue sẽ chắc chắn không đầy nữa nên chúng ta có thể notifynot_full

  • Sử dụng
pub fn run_with_cond_var() {
    let queue = Arc::new(SafeQueue::new());
    let mut tasks = vec![];
    for i in 0..4 {
        let queue2 = queue.clone();
        let c = thread::spawn(move || dequeue(queue2, i));
        tasks.push(c);
    }
    for i in 0..20 {
        let queue1 = queue.clone();
        let p = thread::spawn(move || enqueue(queue1, i));
        tasks.push(p);
    }

    for t in tasks {
        t.join().unwrap()
    }
}

Kết quả

Producer 0 push: 43
Consumer 0 consumes value:43
Producer 1 push: 41
Consumer 0 consumes value:41
Producer 2 push: 95
Consumer 0 consumes value:95
Producer 3 push: 89
Consumer 0 consumes value:89
Producer 5 push: 29
Consumer 1 consumes value:29
Producer 6 push: 44
Consumer 1 consumes value:44
Producer 10 push: 67
Consumer 1 consumes value:67
Producer 8 push: 18
Consumer 1 consumes value:18
Producer 9 push: 22
Consumer 1 consumes value:22
Producer 7 push: 65
Consumer 1 consumes value:65
Producer 11 push: 0
Producer 16 push: 23
Producer 13 push: 74
Producer 18 push: 4
Producer 14 push: 61
Consumer 2 consumes value:0
Consumer 2 consumes value:23
Consumer 0 consumes value:74
Producer 12 push: 43
Consumer 1 consumes value:4
Consumer 1 consumes value:61
Consumer 1 consumes value:43
Producer 17 push: 57
Consumer 1 consumes value:57
Producer 19 push: 41
Consumer 1 consumes value:41
Producer 15 push: 78
Consumer 1 consumes value:78
Producer 4 push: 92
Consumer 1 consumes value:92

Tổng kết

  • Sử dụng condvar để có thể lắng nghe theo một điều kiện nào đó
  • condvar luôn luôn liên kết với một mutex
  • Gọi hàm wait để đợi vào hàm notify_one() hoặc notify_all() để làm điều kiện walke_up
  • Trong khi đợi nó không consume CPU