Как несколько потоков могут совместно использовать итератор?

Вопрос:

Я работал над функцией, которая копирует кучу файлов из источника в пункт назначения, используя Rust и threads. У меня возникают проблемы с тем, что потоки разделяют итератор. Я еще не привык к системе заимствований:

extern crate libc;
extern crate num_cpus;

use libc::{c_char, size_t};
use std::thread;
use std::fs::copy;

fn python_str_array_2_str_vec<T, U, V>(_: T, _: U) -> V {
unimplemented!()
}

#[no_mangle]
pub extern "C" fn copyFiles(
sources: *const *const c_char,
destinies: *const *const c_char,
array_len: size_t,
) {
let src: Vec<&str> = python_str_array_2_str_vec(sources, array_len);
let dst: Vec<&str> = python_str_array_2_str_vec(destinies, array_len);
let mut iter = src.iter().zip(dst);
let num_threads = num_cpus::get();
let threads = (0..num_threads).map(|_| {
thread::spawn(|| while let Some((s, d)) = iter.next() {
copy(s, d);
})
});
for t in threads {
t.join();
}
}

fn main() {}

Я получаю эту ошибку компиляции, которую я не смог решить:

error[E0597]: 'src' does not live long enough
--> src/main.rs:20:20
|
20 |     let mut iter = src.iter().zip(dst);
|                    ^^^ does not live long enough
...
30 | }
| - borrowed value only lives until here
|
= note: borrowed value must be valid for the static lifetime...

error[E0373]: closure may outlive the current function, but it borrows '**iter', which is owned by the current function
--> src/main.rs:23:23
|
23 |         thread::spawn(|| while let Some((s, d)) = iter.next() {
|                       ^^                          ---- '**iter' is borrowed here
|                       |
|                       may outlive borrowed value '**iter'
|
help: to force the closure to take ownership of '**iter' (and any other referenced variables), use the 'move' keyword, as shown:
|         thread::spawn(move || while let Some((s, d)) = iter.next() {

Я уже видел следующие вопросы:

Значение не достаточно долгое время, когда я использую несколько потоков. Я не использую chunks, я хотел бы попытаться использовать итератор через потоки, хотя создание кусков для передачи их в потоки будет классическим решением.

Не удалось отправить строку & str между потоками, потому что она не достаточно долгое время жизни. Я видел некоторые ответы на использование каналов для связи с потоками, но я не совсем уверен в их использовании. Должен быть более простой способ совместного использования только одного объекта через потоки.

Почему локальная переменная не живет достаточно долго для потока :: scoped. Это привлекло мое внимание. Предполагается, что scoped исправлена с моей ошибкой, но поскольку она находится в нестабильном канале, я хотел бы увидеть, есть ли другой способ сделать это просто используя spawn.

Может кто-нибудь объяснить, как мне исправить сроки жизни, чтобы итератор мог получить доступ из потоков?

Лучший ответ:

Вот MCVE вашей проблемы:

use std::thread;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];
    let mut iter = src.iter().zip(dst);
    thread::spawn(|| {
        while let Some((s, d)) = iter.next() {
            println!("{} -> {}", s, d);
        }
    });
}

Существует несколько связанных проблем:

  1. Итератор живет в стеке, и закрытие потока ссылается на него.
  2. Закрытие принимает изменчивую ссылку на итератор.
  3. Сам итератор имеет ссылку на Vec который живет в стеке.
  4. Сам Vec имеет ссылки на строковые фрагменты, которые, вероятно, живут в стеке, но не гарантированно живут дольше, чем поток в любом случае.

С другой стороны, компилятор Rust остановил вас от выполнения четырех отдельных элементов памяти без обеспечения безопасности.

Главное, чтобы признать, что любой поток, который вы создаете, может пережить место, где вы его породили. Даже если вы вызываете join сразу, компилятор не может статический убедиться в том, что произойдет, поэтому он должен принять консервативный путь. Это точка охваченных областей — они гарантируют, что поток выйдет до начала стека, в который они были запущены.

Кроме того, вы пытаетесь использовать изменяемую ссылку в нескольких параллельных потоках. Там zero гарантируют, что итератор (или любой из итераторов, на котором он был построен) можно смело вызывать параллельно. Это вполне возможно, что два поток вызывает next точно то же самое время. Две части кода запускаются параллельно и записываются в один и тот же адрес памяти. Один поток записывает половину данных, а другой поток записывает другую половину, и теперь ваша программа сбой в какой-то произвольной точке в будущем.

Используя инструмент, подобный перекладине, ваш код будет выглядеть примерно так:

extern crate crossbeam;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];

    let mut iter = src.iter().zip(dst);
    while let Some((s, d)) = iter.next() {
        crossbeam::scope(|scope| {
            scope.spawn(|| {
                println!("{} -> {}", s, d);
            });
        });
    }
}

Как уже упоминалось, это будет порождать только один поток за раз, ожидая его завершения. Альтернативой получению большего параллелизма (обычной точки этого упражнения) является обмен вызовами на next и spawn. Это требует передачи права собственности на s и d на поток через ключевое слово move:

extern crate crossbeam;

fn main() {
    let src = vec!["one", "alpha"];
    let dst = vec!["two", "beta"];

    let mut iter = src.iter().zip(dst);
    crossbeam::scope(|scope| {
        while let Some((s, d)) = iter.next() {
            scope.spawn(move || {
                println!("{} -> {}", s, d);
            });
        }
    });
}

Если вы добавите вызов сна в spawn, вы можете увидеть, что потоки выполняются параллельно.

Я бы написал это, используя цикл for:

let iter = src.iter().zip(dst);
crossbeam::scope(|scope| {
    for (s, d) in iter {
        scope.spawn(move || {
            println!("{} -> {}", s, d);
        });
    }
});

В конце концов, итератор выполняется в текущем потоке, и каждое значение, возвращаемое из итератора, затем передается в новый поток. Новые потоки гарантированно выходят до захваченных ссылок.

Вы можете быть заинтересованы в Rayon, ящике, который позволяет легко распараллелить определенные типы итераторов.

Смотрите также:

Оцените статью
TechArks.Ru
Добавить комментарий