Как добавлять задачи в цикл обработки событий Tokio, который работает в другом потоке?

Вопрос задан: 9 месяцев назад Последняя активность: 9 месяцев назад
up 2 down

я бы хотел раскрутить цикл событий Tokio вместе с сервером Rocket, а затем добавлять события в этом цикле позже. Я читаю Есть ли способ запустить Tokio :: Задержка в новом потоке, чтобы позволить основной цикл продолжать?, Но это все еще не ясно мне, как достичь своей цели.

1 ответ

Возможно, для Вашего проекта будут необходимы бесплатные векторные карты. На нашем сайте представлены карты для всех стран.

Реклама

up 2 down accepted

Как документаций состояния:

Пока current_thread::Runtime не реализует Send и не могут быть безопасно перемещены в другие потоки, это обеспечивает Handle которые могут быть отправлены в другие потоки и позволяет породить новые задачи оттуда.

Ниже приведен пример того, раскручивается цикл обработки событий в одном потоке и имеющий вторую задачу резьбы икры на нем. Задачи все начинаются в шахматном порядке с интервалом, но закончить в то же время:

use std::{
    thread,
    time::{Duration, Instant},
};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15

fn main() {
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let (handle_tx, handle_rx) = std::sync::mpsc::channel();

    let tokio_thread = thread::spawn(move || {
        let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");

        eprintln!("Runtime created");

        // Give a handle to the runtime to another thread.
        handle_tx
            .send(runtime.handle())
            .expect("Unable to give runtime handle to another thread");

        // Continue running until notified to shutdown
        runtime
            .spawn({ shutdown_rx.map_err(|e| panic!("Error on the shutdown channel: {:?}", e)) });

        // Finish all pending tasks
        runtime.run().expect("Unable to run the runtime");

        eprintln!("Runtime finished");
    });

    let another_thread = thread::spawn(move || {
        let handle = handle_rx
            .recv()
            .expect("Could not get a handle to the other thread's runtime");

        eprintln!("Another thread created");
        let two_seconds_after_creation = Instant::now() + Duration::from_secs(2);

        for value in 0..10 {
            // Run this future in the other thread's runtime
            handle
                .spawn(future::lazy(move || {
                    eprintln!("Starting task for value {}", value);

                    Delay::new(two_seconds_after_creation)
                        .inspect(move |_| eprintln!("Finishing task for value {}", value))
                        .map(drop)
                        .map_err(drop)
                }))
                .expect("Unable to spawn a new task on the runtime");

            thread::sleep(Duration::from_millis(100));
        }

        eprintln!("Another thread finished");
    });

    another_thread.join().expect("Another thread panicked");

    shutdown_tx
        .send(())
        .expect("Unable to shutdown runtime thread");

    tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Another thread finished
Finishing task for value 0
Finishing task for value 1
Finishing task for value 2
Finishing task for value 3
Finishing task for value 4
Finishing task for value 5
Finishing task for value 6
Finishing task for value 7
Finishing task for value 8
Finishing task for value 9
Runtime finished