Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrating Polars with Datafusion #4843

Open
mwiewior opened this issue Jan 8, 2025 · 0 comments
Open

Integrating Polars with Datafusion #4843

mwiewior opened this issue Jan 8, 2025 · 0 comments

Comments

@mwiewior
Copy link

mwiewior commented Jan 8, 2025

Hey I'm working on implementing streaming functionality for Polars on top of some extensions to DataFusion.
For this purpose I added streaming support for AnonymousScan in Polars:

pub struct RangeOperationScan {
    pub(crate) df_iter: Arc<Mutex<SendableRecordBatchStream>>,
}


impl AnonymousScan for SomeOperationScan {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn scan(&self, scan_opts: AnonymousScanArgs) -> PolarsResult<polars::prelude::DataFrame> {
        !todo!("Only streaming is supported")
    }

    fn next_batch(
        &self,
        scan_opts: AnonymousScanArgs,
    ) -> PolarsResult<Option<polars::prelude::DataFrame>> {
        let mutex_stream = Arc::clone(&self.df_iter);
        thread::spawn(move ||{
            let rt = Runtime::new().unwrap();
            let result = rt.block_on(mutex_stream.lock().unwrap().next()); // <-- I think here is the problem
            match result {
                Some(batch) => {
                    let rb = batch.unwrap();
                    let schema_polars = convert_arrow_rb_schema_to_polars_df_schema(&rb.schema())?; 
                    let df = convert_arrow_rb_to_polars_df(&rb, &schema_polars)?;
                    Ok(Some(df))
                },
                None => Ok(None),
            }
        }).join().unwrap()
        

an then:

fn lazy_range_operation_scan(
    py: Python<'_>,
    py_ctx: &PyBioSessionContext,
    df_path1: String,
    df_path2: String,
    range_options: RangeOptions,
) -> PyResult<PyLazyFrame> {
    py.allow_threads(|| {
        
// some code removed
      
        let rt = Runtime::new().unwrap();
        let ctx = &py_ctx.ctx;

        let args = ScanArgsAnonymous {
            schema: Some(Arc::new()),
            ..ScanArgsAnonymous::default()
        };
      
   // some code removed 
        let stream = rt.block_on(df?.execute_stream())?;
        let scan = RangeOperationScan {
            df_iter: Arc::new(Mutex::new(stream)),
        };
        let function = Arc::new(scan);
        let lf = LazyFrame::anonymous_scan(function, args).unwrap();
        Ok(lf.into())
}

Everything works like charm when there is only Datafusion runs using a single-thread (or multithread with Polars Rust API). If i try to run in multi-threaded mode in Python I suspect that main thread quits causing Datafusion tasks get cancelled:

thread '<unnamed>' panicked at src/scan.rs:97:36:
called `Result::unwrap()` on an `Err` value: External(Internal("Non Panic Task error: task 43 was cancelled"))
stack backtrace:
   0:        0x34c592498 - <unknown>
   1:        0x34c5b4834 - <unknown>

not sure if it's possible to prevent that five that next_batch method is called externally by the Polars engine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant