pub fn into_stream_with_locals_v2<'p>(
    locals: TaskLocals,
    gen: &'p PyAny
) -> PyResult<impl Stream<Item = PyObject> + 'static>
Expand description

unstable-streams Convert an async generator into a stream

This API is marked as unstable and is only available when the unstable-streams crate feature is enabled. This comes with no stability guarantees, and could be changed or removed at any time.

Arguments

  • locals - The current task locals
  • gen - The Python async generator to be converted

Examples

use pyo3::prelude::*;
use futures::{StreamExt, TryStreamExt};

const TEST_MOD: &str = r#"
import asyncio

async def gen():
    for i in range(10):
        await asyncio.sleep(0.1)
        yield i
"#;

let stream = Python::with_gil(|py| {
    let test_mod = PyModule::from_code(
        py,
        TEST_MOD,
        "test_rust_coroutine/test_mod.py",
        "test_mod",
    )?;

    pyo3_asyncio::tokio::into_stream_with_locals_v2(
        pyo3_asyncio::tokio::get_current_locals(py)?,
        test_mod.call_method0("gen")?
    )
})?;

let vals = stream
    .map(|item| Python::with_gil(|py| -> PyResult<i32> { Ok(item.as_ref(py).extract()?) }))
    .try_collect::<Vec<i32>>()
    .await?;

assert_eq!((0..10).collect::<Vec<i32>>(), vals);

Ok(())