Bài này chúng ta sẽ tìm hiểu về Async trong Rust

Outline

  • Chapter 0: Async là gì?
  • Chapter 1: How to use async
  • Chapter 2: Practice: Write a middleware in actix library

References:

Chapter 0: Async là gì?

Các mô hình song song hiện tại

  • OS thread
  • Event-driven Programming
  • Coroutine
  • The actor model

Lợi ich của async trong Rút

  • Inert
  • Zerocost
  • No built-in runtime
  • Single or multithreads

Định nghĩa:

Async provides significantly reduced CPU and memory overhead, especially for workloads with a large amount of IO-bound tasks, such as servers and databases. All else equal, you can have orders of magnitude more tasks than OS threads, because an async runtime uses a small amount of (expensive) threads to handle a large amount of (cheap) tasks.

Nó sử dụng một số nhỏ của threads để handle số lượng lớn các tasks. Tưởng tượng rằng runtime async sẽ tối ưu việc chuyển các tasks cho các OS thread một cách tối ưu nhất.

Trạng thái hiện tại

  • Hỗ trợ cú pháp async/await trong Rust compiler.
  • Sử dụng trait Future. Hoặc sử dụng crate future (https://docs.rs/futures/latest/futures/)
  • Tokio

Sử dụng

[dependencies] futures = "0.3"

Nhìn về khía cạnh lập trình thì Future là một điều khiển mà vẫn chưa được hoàn thành. Điều khiển có thể trả về trong tương lại và có

Câu hỏi: Khi nào nó được trả về? Do cách chúng ta quy định, hoặc phụ thuộc vào các điều khiển khác chúng ta sử dụng

Các thành phần của Async

  • Runtime Executor: Nới mà chúng lập lịch cho tất các Future để run.
  • Tasks: Mỗi lần thực hiện lời hàm poll chúng ta có thể gọi nó là 1 tasks
  • Future: Một đối tượng có đặc tính trait
  • Waker: Một đối tượng liên kết với Future mà executor sử dụng để đánh thức Future đó

Chapter 1: How to use Async?

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Chúng ta có một Future bao gồm type Output. Đây chính là kiểu mà chúng ta hứa sẽ trả về kết quả. và một hàm poll để điều khiển khi nào chúng ta sẽ trả về.

Nó chứa một hàm wake() dùng để đánh thức Future này khi có một vài xử kiện xảy ra làm cho Future “có tiến triển” hoặc “make a progress”

Chúng ta sẽ cần tìm hiểu kỹ nó hơn trong phần sau. Giờ tìm hiểu cách sử dụng

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

Giả sử chúng ta có struct SocketRead. Chúng ta sẽ muốn làm nếu SocketRead này nhận dữ liệu chúng ta sẽ trả về. Khi mà I/O thực sự có Rõ ràng socket là đối tượng Socket bên dưới sẽ biết dữ liệu có hay không mỗi khi gọi hàm has_data_to_read()

Cách nó thực hiện như sau:

Executor lập lịch cho Future này, Future sẽ đăng ký hàm poll với Executor để nó gọi mỗi khi có sự kiện xảy ra trong hệ thống. Sử kiện đó có thể là 1 số bytes nhận được từ card mạng, file được mở, một connection nào bị đóng… Nó sẽ gọi hàm poll này mỗi lần như vậy. Nếu chúng ta muốn trả về kết quả chỉ cần trả về Pool::Ready(T) với T là kiểu dữ liệu trả về. Nếu chúng ta chưa muốn trả về trả về Poll::Pending. Nó sẽ yield điều khiển ở đây và lưu trữ trạng thái hiện tại. Ngoài ra chúng ta cần liên kết Future này với waker. Lần poll này có thể nằm trên 1 OS thread nào đó, còn lần gọi poll thứ 2, 3 có thể lại nằm trên một OS thread khác.

Có một số vấn đề làm cho này có thể khó

  • Chính vì lời gọi hàm poll có thể xảy ra trên nhiều thread khác nhau, dữ liệu sẽ phải lưu lại và move tới thread khác như naò?
  • Các biến được move giữa các thread, việc các biến tham chiếu nằm trong hàm poll có cách nào để keep track được. Đó là nới Pin được sử dụng
  • Lifetimes

Các hình thức sử dụng

  • Sử dụng block async
  • Sử dụng đối tượng Future (thực hiện trait Future trait)

Lifetimes

  • Hàm async yêu cầu các đối tượng có ‘static. Nghĩa là nó cần được move hoàn toàn vào trong hàm
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

Việc đặt let x = 5; vào trong async block. Chúng ta đã mở rộng vòng đời lên tới khi kết thúc 1 Future

Hoặc có thể đặt bên ngoài nhưng nên cần move quyền vào trong khối async move {}

Đợi trên nhiều Future

  • Sử dụng join!
  • Sử dụng select!
  • Thread::spawn

Async ecosystem

  • Tokio: A popular async ecosystem with HTTP, gRPC, and tracing frameworks.
  • async-std: A crate that provides asynchronous counterparts to standard library components.
  • smol: A small, simplified async runtime. Provides the Async trait that can be used to wrap structs like UnixStream or TcpListener.
  • fuchsia-async: An executor for use in the Fuchsia OS.

Chapater 2: Practice: Write a middleware in actix library

Xem xét đoạn code sau

use std::future::{ready, Ready};

use actix_web::{
    dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
    Error,
};
use futures_util::future::LocalBoxFuture;

// There are two steps in middleware processing.
// 1. Middleware initialization, middleware factory gets called with
//    next service in chain as parameter.
// 2. Middleware's call method gets called with normal request.
pub struct SayHi;

// Middleware factory is `Transform` trait
// `S` - type of the next service
// `B` - type of response's body
impl<S, B> Transform<S, ServiceRequest> for SayHi
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type InitError = ();
    type Transform = SayHiMiddleware<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ready(Ok(SayHiMiddleware { service }))
    }
}

pub struct SayHiMiddleware<S> {
    service: S,
}

impl<S, B> Service<ServiceRequest> for SayHiMiddleware<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

    forward_ready!(service);

    fn call(&self, req: ServiceRequest) -> Self::Future {
        println!("Hi from start. You requested: {}", req.path());

        let fut = self.service.call(req);

        Box::pin(async move {
            let res = fut.await?;

            println!("Hi from response");
            Ok(res)
        })
    }
}


Định nghĩa:

  • Cần thực hiện 2 trait như sau: TransformService<ServiceRequest>

Transform:

  • Future = Ready<Result<Self::Transform, Self::InitError>>

E

Example 2

Middleware authentication


use actix_web::HttpResponse;
use actix_web::{
    body::EitherBody,
    dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
    http::header::{HeaderName, HeaderValue},
    Error,
};
use futures_util::future::LocalBoxFuture;
use log::debug;
use log::error;
use std::future::{ready, Ready};

use std::sync::{Arc, Mutex};

use crate::utils::jwt::{decode_jwt, Claim, TOKEN_CLAIMS_HEADER};
pub struct Authentication {}

impl<S, B> Transform<S, ServiceRequest> for Authentication
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<EitherBody<B>>;
    type Error = Error;
    type InitError = ();
    type Transform = AuthenticationMiddleware<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ready(Ok(AuthenticationMiddleware { service }))
    }
}

pub struct AuthenticationMiddleware<S> {
    service: S,
}

impl<S, B> Service<ServiceRequest> for AuthenticationMiddleware<S>
where
    S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Response = ServiceResponse<EitherBody<B>>;
    type Error = Error;
    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;

    forward_ready!(service);

    fn call(&self, mut req: ServiceRequest) -> Self::Future {
        debug!("Request: {:?}", req.path());
        let (need_auth) = match req.path() {
            "/login" | "/register" | "/swagger-ui" => false,
            "/refresh" => true,
            _ => true,
        };

        if need_auth {
            let auth_header = req.headers().get("Authorization");
            if let Some(token) = auth_header {
                let auth_token = token.to_str().unwrap();
                let Ok(claims) = decode_jwt(auth_token) else {
                    error!("Error verifying token");
                    return Box::pin(
                        async move { Ok(unauthorized_resp(req).map_into_right_body()) },
                    );
                };
                create_claim_header(&mut req, claims);
            } else {
                return Box::pin(async move { Ok(unauthorized_resp(req).map_into_right_body()) });
            }
        }

        let fut = self.service.call(req);
        Box::pin(async move {
            let res = fut.await?;
            Ok(res.map_into_left_body())
        })
    }
}

fn create_claim_header(req: &mut ServiceRequest, claims: Claim) {
    let Ok(claims_str) = serde_json::to_string(&claims) else {
        error!("Error serializing data");
        return;
    };
    let Ok(header_value) = HeaderValue::from_str(&claims_str) else {
        error!("Error creating header");
        return;
    };
    req.headers_mut()
        .insert(HeaderName::from_static(TOKEN_CLAIMS_HEADER), header_value);
}

fn unauthorized_resp(req: ServiceRequest) -> ServiceResponse {
    let http_res = HttpResponse::Unauthorized().finish();
    let (http_req, _) = req.into_parts();
    ServiceResponse::new(http_req, http_res)
}

Hàm decode and encode jwt


pub const TOKEN_CLAIMS_HEADER: &str = "token_claims";
pub fn encode_jwt(email: String) -> Result<String, StatusCode> {
    let now = Utc::now();
    let expire = Duration::hours(24);
    let claim = Claim {
        iat: now.timestamp(),
        exp: (now + expire).timestamp() as i64,
        email,
    };
    let secret = std::env::var("SECRET_KEY").unwrap();
    encode(
        &Header::default(),
        &claim,
        &EncodingKey::from_secret(secret.as_ref()),
    )
    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}

pub fn decode_jwt(token: &str) -> Result<Claim, StatusCode> {
    let secret = std::env::var("SECRET_KEY").unwrap();
    jsonwebtoken::decode::<Claim>(
        token,
        &DecodingKey::from_secret(secret.as_ref()),
        &Validation::default(),
    )
    .map(|data| data.claims)
    .map_err(|_| StatusCode::UNAUTHORIZED)
}

Cách sử dụng:

 HttpServer::new(move || {
        actix_web::App::new()
            .app_data(app_data.clone())
            .wrap(Logger::default())
            .configure(auth_routes::routes)
            .wrap(middlewares::authn::Authentication {})
            .wrap(from_fn(middlewares::auth_middleware::auth_middleware))