Khi viết code Rust với đa luồng bạn sẽ chạm trán 2 kiểu dữ liệu là ArcMutex. Mặc dù Mutex có thể rằng bạn đã qúa quen thuộc với nó bới đây cũng đã là nguyên lý trong nhiều ngôn ngữ khác, thì với Arc, bạn có lẽ chưa bao giờ nghe trước đó. Thêm nữa, bạn có thể vẫn chưa hiểu đầy đủ những nguyên lý này khi mà chưa từng code chúng trong mô hình ownership trong Rust. Bài viết này sẽ làm sáng tỏ hai nguyên lý này.

Bài này dịch lại từ bài viết: https://itsallaboutthebit.com/arc-mutex/ nhưng theo ý hiểu.

Thông thường khi bạn chia sẻ dữ liệu trong môi trường đa luồng, bạn hoặc là chia sẻ bộ nhớ hoặc chuyển dữ liệu như một message. Bạn có thể thường nghe rằng passing message (ví dụ sử dụng 1 channel) là một cách hay để xử lý đa luồng, nhưng trong Rust tôi không nghĩ tính an toàn hoặc tính đúng đăn khác nhiều so với ngôn ngữ khác bởi mô hình ownership trong Rust cũng đã làm điều đó rồi. Thêm nữa, bạn cũng không có data race trong Rust. Đó là lý do tại sao khi tôi chọn giữa passing message hoặc là chia sẻ bộ nhớ trong Rust, tôi thường chọn cách thuận tiện hơn thay vì quan tâm vấn đề không an toàn.

Nếu bạn lựa chọn chia sẻ dữ liệu bởi cách chia sẻ bộ nhớ bạn sẻ nhanh chóng nhận ra rằng, bạn sẽ chẳng làm được gì nếu không sử dụng ArcMutex. Arc là một con trỏ thông minh để cho bạn chia sẻ an toàn giá trị giữa các threads. Mutex là kiểu wrapper qua kiểu khác, mà cho phép sửa đổi giá trị một cách an toàn. Để có thể hiểu đầy đủ những nguyên lý này, cần đi sâu để nghiên cứu nó.

Ownership trong Rust

Nếu bạn đã cố gắng phân giải mô hình ownership trong Rust, có thể bạn đã gặp những điểm sau:

  • Một giá trị có thể chỉ có 1 owner
  • Bạn có thể có nhiều shared immutable reference to một giá trị
  • Bán chỉ có thể có một mutable reference to một giá trị

Xem xét đoạn mã sau để thấy vấn đề. Chúng ta có User struct bao gồm 1 file name kiểu String. Tạo một thread và in nó trong đó.

use std::thread::spawn;

#[derive(Debug)]
struct User {
    name: String
}

fn main() {
    let user = User { name: "drogus".to_string() };

    spawn(move || {
        println!("Hello from the first thread {}", user.name);
    }
}

Chương trình hoạt động tốt và không có vấn để gì cả. Bây giờ tưởng tượng rằng chúng ta sẽ thêm một thread thứ hai cũng truy xuất đến user interface.

fn main() {
    let user = User { name: "drogus".to_string() };

    let t1 = spawn(move || {
        println!("Hello from the first thread {}", user.name);
    });

    let t2 = spawn(move || {
        println!("Hello from the second thread {}", user.name);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Chúng ta sẽ gặp lỗi như sau:

error[E0382]: use of moved value: `user.name`
  --> src/main.rs:15:20
   |
11 |     let t1 = spawn(move || {
   |                    ------- value moved into closure here
12 |         println!("Hello from the first thread {}", user.name);
   |                                                    --------- variable moved due to use in closure
...
15 |     let t2 = spawn(move || {
   |                    ^^^^^^^ value used here after move
16 |         println!("Hello from the second thread {}", user.name);
   |                                                    --------- use occurs due to use in closure
   |
   = note: move occurs because `user.name` has type `String`, which does not implement the `Copy` trait

Compiler muốn gì? Lỗi là "use of moved value user.name". Compiler đủ tốt để chỉ ra chính xác dòng xảy ra điều đó. Cùng giải thích nhé! Đầu tiên, chúng ta move biến user vào trong thread 1 tại dòng 11 và sau đó cố gắng để làm điều tương tự với thread 2 tại dòng 15. Nếu như bạn nhìn vào luật ownership, điều này rõ ràng không được. Một giá trị có thể chỉ có 1 owner. Với phiên bản đầu tiên của code, chúng ta cần move giá trị tới thread 1 nếu chúng ta muốn sử dụng nó, và do đó chúng ta không thể move tới thread khác được nữa. Nhưng chúng ta không muốn thay đổi data cơ mà? Nghĩa là chúng ta có nhiều tham chiếu cùng lúc được. Thử điều này như sau:

fn main() {
    let user = User { name: "drogus".to_string() };

    let t1 = spawn(|| {
        println!("Hello from the first thread {}", &user.name);
    });

    let t2 = spawn(|| {
        println!("Hello from the second thread {}", &user.name);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Chúng ta bỏ từ khoá move đi. Các thread giờ chỉ mượn giá trị user mà không thay đổi gì có. Khá là oki so với luật borrowing. Nhưng hãy khoan, chúng ta gặp lỗi sau

error[E0373]: closure may outlive the current function, but it borrows `user.name`, which is owned by the current function
  --> src/main.rs:15:20
   |
15 |     let t2 = spawn(|| {
   |                    ^^ may outlive borrowed value `user.name`
16 |         println!("Hello from the first thread {}", &user.name);
   |                                                     --------- `user.name` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/main.rs:15:14
   |
15 |       let t2 = spawn(|| {
   |  ______________^
16 | |         println!("Hello from the second thread {}", &user.name);
17 | |     });
   | |______^
help: to force the closure to take ownership of `user.name` (and any other referenced variables), use the `move` keyword
   |
15 |     let t2 = spawn(move || {
   |                    ++++

Lỗi này nói rằng closure có thể outlive khỏi function. Nói một cách khác, Rust compiler không đảm bảo rằng closure trong thread sẽ kết thúc trước khi hàm main kết thúc. Thread thì đang đi mượn thằng useruser thì chỉ có vòng đời đến khi hàm main kết thúc. Một khi main function kết thúc, struct user cũng sẽ out of scopedrop (freed memory). Do đó, nếu nó được cho phép để chia sẻ giá trị trong thread theo cách này, bạn có thể rơi vào trường hợp khi một thread đang cố gắng để đọc một dữ liệu đã được free đi rồi. Đây là hành vi rõ rằng chúng ta không muốn chút nào.

Cảnh báo cũng nói rằng, nó có thể move biến user sang thread để tránh vấn đề borrowing này. Nhưng như thế lại rơi vào trường hợp đầu tiên chúng ta xét. Chúng ta có 2 cách để xử lý với nó

Solution 1: Scoped threads

Scoped thread là một tính năng trong crate crossbeam hoặc trong tính năng thư nghiêm của nighly trong Rust. Chúng ta có thể sử dụng bằng cách thêm crossbeam = "0.8" tới Cargo.toml

use crossbeam::scope;

#[derive(Debug)]
struct User {
    name: String,
}

fn main() {
    let user = User {
        name: "drogus".to_string(),
    };

    scope(|s| {
        s.spawn(|_| {
            println!("Hello from the first thread {}", &user.name);
        });

        s.spawn(|_| {
            println!("Hello from the second thread {}", &user.name);
        });
    })
    .unwrap();
}

Cách để scoped thread hoạt động là:

  • Nó đảm bảo tất cả các thread trong scope này sẽ kết thúc trước khi hàm main kết thúc hay nói một cách khác trước khi scoped closures thoát, threads đã được join và đơi đến khi kết thức. Nhờ đó compiler biết rằng không biến nào mượn sẽ outlive

Một điểm thú vị ở đây là mặc dù chúng ta (human reader) hoàn toàn có thể tự phân định được rằng biến user hoàn toàn valid ở trong cả 2 trường hợp. Trong phiên bản mà Rust từ chối, chúng ta cũng đã join 2 threads trước khi hàm main() kết thúc, do đó nó đáng lẽ là phải an toàn để chia sẻ biến user chứ ? Không may mắn, compiler không đủ thông minh đến mức đó! Viết một compiler mà đủ chấp mọi tất cả các chương trình hợp là điều không thể. Complier có thể làm điều tốt nhất lúc này là: Từ chối tất cả các chương trình invalid không hợp lệ tại một mức kiểm tra nào đó. Scope thread là một future được viết đặc biệt để cho phép chúng ta viết code này trong một cách mà compiler có thể chấp nhận nó.

Khá hữu dụng khi sử dụng tính năng scoped thread, tuy nhiên bạn có không thể lúc nào cũng sử dụng nó. Ví dụ khi viết code async. Cùng đến giải pháp tiếp theo với Arc

Solution 2: Arc

Arc là một con trỏ thông mình cho phép chia sẻ dữ liệu giữa các thread. Như tên gọi của nó, Atomic reference counter, thuật ngữ này cũng xuất hiện nhiều trong ngôn ngữ như C++. Cách Arc hoạt động là wrap một giá trị chúng ta đang chia sẻ và hành động như a con trỏ tới nó. Arc sẽ theo dõi tất cả các bạn copies của contror và ngay sau khi con trỏ cuối cùng bị huỷ (out of scope), nó có thể huỷ dữ liệu chia sẻ.

use std::thread::spawn;
use std::sync::Arc;

#[derive(Debug)]
struct User {
    name: String
}

fn main() {
    let user_original = Arc::new(User { name: "drogus".to_string() });

    let user = user_original.clone();
    let t1 = spawn(move || {
        println!("Hello from the first thread {}", user.name);
    });

    let user = user_original.clone();
    let t2 = spawn(move || {
        println!("Hello from the first thread {}", user.name);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Cùng xem xét từng dòng 1. Đầu tiên, chúng ta tạo một giá trị user nhưng được wrap nó trong Arc. Bêy giờ, giá trị được lưu trũ trong bộ nhớ và Arc chỉ hành động như một con trỏ. Bất kể khi nào chúng ta clone Arc, chúng ta chỉ clone tham chiếu của nó, không phải là bản thân giá trị user. Chúng ta clone Arc và con trỏ của pointer này sẽ được move vào mỗi một thread. Bởi bạn có thể nhìn thấy Arc cho phép chúng ta chia sẻ dữ liệu bất kể vòng lifetime của chúng. Trong ví dụ này, chúng ta sẽ có 3 con trỏ tới cùng một giá trị user. Một cái được tao khi Arc được tạo, một cái được tạo trước thread 1 và một cái được tạo trước thread 2. Miễn những con trỏ này là đang alive, Rust sẽ không free bộ nhớ. Nhưng khi cả threads và main function kết thúc, tất cả Arc pointer sẽ out of scope, bị dropped (huỷ) ngay sau khi cái cuối cùng huỷ, giá trị user bị huỷ theo.

Send và Sync

Bài viết có thể cũng đã ly kỳ. Nhưng cần đi sâu thêm 1 chút về 2 trait SendSync. Trong tài liệu của Arc nó thực hiện 2 trait SendSync nhưng chỉ nếu kiểu được wrapped lại cũng thực hiện kiểu SendSync. Để hiểu rõ đây là gì, tại sao nó được thực hiện cách này bằng cách định nghĩa SendSync.

Trong Rustonomicon định nghĩa như sau:

  • Một kiểu là Sendnếu nó an toàn để send (truyên) nó tới một thread khác.
  • Một kiểu là Sync nếu nó an toàn để chia sẻ giữa các thread. (TSync nếu và chỉ nếu &TSend)

Hơi khó hiểu phải không? Cùng giải thích theo ý hiểu nhé! Thứ nhất 2 kiểu này là 1 dạng marker trait. Chúng không có bất kỳ phương thức thực hiện nào hay yêu cầu chúng ta thực hiện. Gì chúng cho phép là để notify (báo) cho compiler biết về khả năng của kiểu này được shared hay được sent giữa các thread. Với Send, là khá rõ ràng. Bạn không thể truyền (send) một kiểu là !Send (đọc là không Send) tới một thread khác. Ví dụ bạn không thể send nó qua một channel hoặc có thể move nó tới một thread như trong ví dụ sau

#![feature(negative_impls)]

#[derive(Debug)]
struct Foo {}
impl !Send for Foo {}

fn main() {
    let foo = Foo {};
    spawn(move || {
        dbg!(foo);
    });
}

SendSync là tự động sinh ra, nghĩa rằng ví dụ nếu tất cả các thuộc tính (attributes) của một kiểu là Send, kiểu cũng sẽ được đánh dấu là Send. Code trên sử dụng tính năng thưc nghiệm negative_impls, mà nói với compiler rằng: "Tôi muốn đánh đấu chính xác nó !Send, kệ tôi nhé!". Cố gắng để compile đoạn code trên sẽ gặp 1 lỗi:

`Foo` cannot be sent between threads safely

Điều tương tự cũng xảy ra nếu bạn tạo một channel để send foo tới một thread khác. Thế còn sử dụng với Arc thì sao? Bởi như bạn có thể đoán. Nó cũng sẽ không giúp gì bạn, nó cũng sẽ hiển thị lỗi cùng 1 cách thôi.

#![feature(negative_impls)]

#[derive(Debug)]
struct Foo {}
impl !Send for Foo {}

fn main() {
    let foo = Arc::new(Foo {});
    spawn(move || {
        dbg!(foo);
    });
}

Vậy tại sao? Arc đang wrap kiểu của chúng ta và đưa nó nhiều khả năng khác hay sao? Arc không thể magically làm cho kiểu chúng ta an toàn? Phần giải thích sẽ ở sau.

Cho đến nay, những gì chúng ta có thể học là Arc cho phép chúng ta chia sẻ dữ liệu có đặc tính Send + Sync giữa các threads ngoại trừ chúng ta phải lo lắng về lifetime. (Nó không là tham chiếu thông thường, nó là một con trỏ thông minh)

Thay đổi dữ liệu với Mutex

Cùng nói đến Mutex. Mutexes trong nhiều ngôn ngữ đối xử giống như semaphores. Bạn tạo một đối tượng mutex và bạn có bảo về một phần của code với mutex theo một cách rằng chỉ một thread tại một thời điểm có thể truy cập đến vùng code được bảo vệ này. Trong Rust Mutex có lẽ 1 chút khác và giống với wrapper hơn. Nó consume giá trị ở dưới và để cho bạn truy xuất chỉ sau khi locking mutex (mutex được khoá). Thông thường, Mutex là được sử dụng liên kết với Arc để dễ chia sẻ dữ liệu quan nhiều threads.

use std::time::Duration;
use std::{thread, thread::sleep};
use std::sync::{Arc, Mutex};

struct User {
    name: String
}

fn main() {
    let user_original = Arc::new(Mutex::new(User { name: String::from("drogus") }));

    let user = user_original.clone();
    let t1 = thread::spawn(move || {
        let mut locked_user = user.lock().unwrap();
        locked_user.name = String::from("piotr");
        // after locked_user goes out of scope, mutex will be unlocked again,
        // but you can also explicitly unlock it with:
        // drop(locked_user);
    });

    let user = user_original.clone();
    let t2 = thread::spawn(move || {
        sleep(Duration::from_millis(10));

        // it will print: Hello piotr
        println!("Hello {}", user.lock().unwrap().name);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Dòng đầu tiên chúng ta tạo một đối tượng user được wrap bởi một Mutex và tất cả được bọc trong Arc. Với Arc chúng ta có thể dễ dang để clone con trỏ và do đó chia sẻ Mutex giữa các thread. Mutex là được lock sử dụng lời gọi hàm lock() và sau thời điểm này dữ liệu phía dưới có thể được sử dụng bởi thread này. Sau đó, chúng ta thay đổi giá trị của value trong dòng tiếp theo. Mutex là được unlock một khi locked guard out of scope hoặc chúng ta có thể thủ công xoá bằng cách gọi hàm drop().

Trong thread 2 chúng ta đợi 10ms và in ra tên và kỳ vọng là tên được cập nhật trong thread 1. Thread 1 nhả lock và thread 2 chiếm được lock và in ra tên

Một điểm đáng đề cập ở đây là unwrap() chúng ta gọi sao lock(). Mutex từ thư viện chuẩn có khái niệm poisoning (Xem xét tại: https://doc.rust-lang.org/std/sync/struct.Mutex.html#poisoning). Nếu một thread panic trong khi mutex đang được lock, chúng ta không thể chắc chắn nếu giá trong Mutex là vẫn hợp lý và do đó hành vi mặc đúng là trở về một lỗi thay vì một guard. Thông thường, bỏ unwrap() trong code production là không được khuyến khích, nhưng trong case Mutex này nó có thể là chiến thuật thích đáng - Nếu một Mutex đã bị nhiễm độc (poisioned), chúng ta có thể quết định trạng thái chương trình là không hợp lệ và cần crash chương trình

Một điểm thú vị nữa là, miễn kiểu trong MutexSend, Mutex sẽ luôn là Sync. Điều này là bởi vì Mutex đảm bảo chi có 1 thread có thể truy xuất tại tới dữ liệu ở dưới, và do đó nó an toàn để chia sẽ Mutex giữa các threads.

Mutex: Thêm Sync tới một kiểu Send

Như bạn có thể nhớ từ đầu, Arc cần dữ liệu ở dưới là Send + Sync để cho Arc cũng là Send + Sync. Mutex chỉ yêu cầu dữ liệu ở dưới là Send để cho MutexSend + Sync

Mutex ngoài Arc?

Câu hỏi là, có thể nào sử dụng Mutex ngoài Arc.

Câu trả lời là có thể dùng scoped threads

use crossbeam::scope;
use std::{sync::Mutex, thread::sleep, time::Duration};

#[derive(Debug)]
struct User {
    name: String,
}

fn main() {
    let user = Mutex::new(User {
        name: "drogus".to_string(),
    });

    scope(|s| {
        s.spawn(|_| {
            user.lock().unwrap().name = String::from("piotr");
        });

        s.spawn(|_| {
            sleep(Duration::from_millis(10));

            // should print: Hello piotr
            println!("Hello {}", user.lock().unwrap().name);
        });
    })
    .unwrap();
}

Bonus: Tại sao Arc cần kiểu là Sync ?

use std::cell::Cell;

struct User {
    age: Cell<usize>
}

fn main() {
    let user = User { age: Cell::new(30) };

    user.age.set(36);

    // will print: Age: 36
    println!("Age: {}", user.age.get());
}

Cell không thread safe hay nói cách khác nó !Sync. Nếu bạn chia sẻ một gía trị wrapped trong một Cell giữa nhiều thread, bạn có thể cần thay đổi dữ liệu dữ liệu từ 2 threads

// this example will not compile, `Cell` is `!Sync` and thus
// `Arc` will be `!Sync` and `!Send`
use std::cell::Cell;

struct User {
    age: Cell<usize>
}

fn main() {
    let user_original = Arc::new(User { age: Cell::new(30) });

    let user = user_original.clone();
    std::thread::spawn(move || {
        user.age.set(2);
    });

    let user = user_original.clone();
    std::thread::spawn(move || {
        user.age.set(3);
    });
}

Nếu đoạn code trên hoạt động, nó có thể kết quả của hành vị không xác định. Đó là lý do Arc không làm việc bất kỳ kiểu không Send hoặc Sync. Tại điểm Cell là Send nghĩa là bạn có thể send giữa các threads được. Tại sao?, Sending nó sẽ không làm một giá trị có thể truy xuất từ nhiều hơn thread. Nó sẽ chỉ luôn có 1 thread. Một khi bạn move nó tới thread khác, thread trước không sở hữu giá trị đó nữa.

Bonus: Tại Arc không cung cấp Send cho kiểu !Send ?

Kiểu điển hình !SendRc . Rc là họ hàng chú dì với Arc chỉ là nó không atomic. Nó không chỉ không được share giữa các thread mà còn không thể được move giữa các thread.

// this code won't compile, Rc is !Send and !Sync
use std::rc::Rc;

fn main() {
    let foo = Rc::new(1);

    let foo_clone = foo.clone();
    std::thread::spawn(move || {
        dbg!(foo_clone);
    });

    let foo_clone = foo.clone();
    std::thread::spawn(move || {
        dbg!(foo_clone);
    });
}

Ví dụ trên không compile. Bởi vì Rc!Sync + !Send. Bộ đếm bên trong là không atomic và do đó chia sẻ nó giữa các thread có thể kết quả trong việc đếm không chính xác tham chiếu.

Tượng tượng, nếu Arc có thể làm việc với một kiểu !Send, chuyện gì sẽ xảy ra:

use std::rc::Rc;
use std::sync::Arc;

#[derive(Debug)]
struct User {
    name: Rc<String>,
}
unsafe impl Send for User {}
unsafe impl Sync for User {}

fn main() {
    let foo = Arc::new(User {
        name: Rc::new(String::from("drogus")),
    });

    let foo_clone = foo.clone();
    std::thread::spawn(move || {
        let name = foo_clone.name.clone();
    });

    let foo_clone = foo.clone();
    std::thread::spawn(move || {
        let name = foo_clone.name.clone();
    });
}

Ví dụ sẽ compile, nhưng lỗi, đừng làm điều này trong code. Ở đây định nghĩa User struct, mà giữ Rc bên trong. Bởi vì SendSync tự động sinh, và Rc!Send + !Sync, User struct có thể !Send + !Sync, Tất nhiên ban có thể bảo với compile là cứ cho nó Send + Sync đi .

Một số kiểu với trait Send + Sync

Send + !Sync

  • mpsc::Receiver
  • Cell
  • RefCell

!Send + Sync

  • MutexGuard<T: Sync>

!Send + !Sync

  • Rc
  • *const T, *mut T

Cập nhật

  • Scope thread đã stable từ rust version 1.63