Bài này chúng ta sẽ tìm hiểu về các hàm như join, stream race, spawn

Tìm hiểu về tạo một thread trong tokio

Chúng ta sử dụng hàm tokio::spwan

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
       let handle: JoinHandle =  tokio::spawn(async move {
            process(socket).await;
        });

        let out =  handle.await.unwrap();
        println!("GOT {}", out);
    }
}

Các điểm lưu ý:

  • Hàm này trả về một JoinHandle<T>
  • Hàm này nhận một async block. Có thể sử dụng tù khoá move để move quyền sở hữu một số biến ngoài vào trong khối lệnh này
  • Giá trị trả về trong khối async block có thể trả về bên ngoài.
  • Lifetime các biến khối async block này phải bao gồm lifetime 'static. Nghĩa là không bao gồm bất kỳ tham chiếu tới dữ liệu bên ngoài task này. Đó là lý do chúng ta hãy sử dụng move để đảm bảo tham chiếu là hợp lệ.
#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    let h = task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
    h.join()

}

Dù có h.join nhưng cũng không hợp lệ. Vì v được capture vào async block là dạng reference. Nhưng nó không biết được v có tồn tại đến hết chương trình không. Nếu nó báo lỗi ngay

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |
  • Một phần của dữ liệu có thể được share giữa nhiều task đồng thời, do đó dùng Arc
  • Task trong tokio::spawn phải thực hiện Send trait. Bởi tokio sẽ move tasks giữa các threads khác nhau trong sử dụng .await. Task là Send khi tất cả các dữ được giữ qua .awaitSend. Khi .await được gọi, task sẽ bị dừng lại và trả về cho scheduler. Nó sẽ scheduler lập lịch cho chạy ở lần tiếp theo, nó lại resume đúng điểm lần cuối nó bị dừng. Để làm điều này, tất cả các state được sử dụng sau .await phải được lưu lại bởi task. Nếu state này là Send thì nó có thể move tới threas khác.

Ví dụ rc không còn được sử dụng nữa. Nên in ra println!("{}", rc); sẽ không compile

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;

        println!("{}", rc);
    });
}


error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

Đồng bộ dữ liệu