[Rust] Concurrency trong Tokio?
Bài này chúng ta sẽ tìm hiểu về các hàm như join
, stream
race
, spawn
Tìm hiểu về tạo một thread trong tokio
Chúng ta sử dụng hàm tokio::spwan
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
let handle: JoinHandle = tokio::spawn(async move {
process(socket).await;
});
let out = handle.await.unwrap();
println!("GOT {}", out);
}
}
Các điểm lưu ý:
- Hàm này trả về một
JoinHandle<T>
- Hàm này nhận một async block. Có thể sử dụng tù khoá move để move quyền sở hữu một số biến ngoài vào trong khối lệnh này
- Giá trị trả về trong khối async block có thể trả về bên ngoài.
- Lifetime các biến khối async block này phải bao gồm lifetime
'static
. Nghĩa là không bao gồm bất kỳ tham chiếu tới dữ liệu bên ngoài task này. Đó là lý do chúng ta hãy sử dụngmove
để đảm bảo tham chiếu là hợp lệ.
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
let h = task::spawn(async {
println!("Here's a vec: {:?}", v);
});
h.join()
}
Dù có h.join
nhưng cũng không hợp lệ. Vì v
được capture vào async block là dạng reference. Nhưng nó không biết được v có tồn tại đến hết chương trình không. Nếu nó báo lỗi ngay
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
- Một phần của dữ liệu có thể được share giữa nhiều task đồng thời, do đó dùng
Arc
- Task trong
tokio::spawn
phải thực hiện Send trait. Bởi tokio sẽ move tasks giữa các threads khác nhau trong sử dụng.await
. Task làSend
khi tất cả các dữ được giữ qua.await
làSend
. Khi .await được gọi, task sẽ bị dừng lại và trả về cho scheduler. Nó sẽ scheduler lập lịch cho chạy ở lần tiếp theo, nó lại resume đúng điểm lần cuối nó bị dừng. Để làm điều này, tất cả các state được sử dụng sau.await
phải được lưu lại bởi task. Nếu state này làSend
thì nó có thể move tới threas khác.
Ví dụ rc
không còn được sử dụng nữa. Nên in ra println!("{}", rc);
sẽ không compile
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
println!("{}", rc);
});
}
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
Đồng bộ dữ liệu
Thanks for reading! Read other posts?