diff --git a/Cargo.lock b/Cargo.lock index 6582e837..64f31798 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5398,7 +5398,7 @@ dependencies = [ [[package]] name = "polars_bio" -version = "0.7.4" +version = "0.8.0" dependencies = [ "arrow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index ef2d71d3..2baea9d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "polars_bio" -version = "0.7.4" +version = "0.8.0" edition = "2021" [lib] diff --git a/README.md b/README.md index 948a1989..26543074 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,8 @@ It provides a DataFrame API for genomics data and is designed to be blazing fast ![count-overlaps-single.png](docs/assets/count-overlaps-single.png) +![coverage-single.png](docs/assets/coverage-single.png) + ## Parallel performance πŸƒβ€πŸƒβ€ ![overlap-parallel.png](docs/assets/overlap-parallel.png) @@ -38,6 +40,8 @@ It provides a DataFrame API for genomics data and is designed to be blazing fast ![count-overlaps-parallel.png](docs/assets/count-overlaps-parallel.png) +![coverage-parallel.png](docs/assets/coverage-parallel.png) + Read the [documentation](https://biodatageeks.github.io/polars-bio/) \ No newline at end of file diff --git a/docs/assets/coverage-parallel.png b/docs/assets/coverage-parallel.png new file mode 100644 index 00000000..5f7f0163 Binary files /dev/null and b/docs/assets/coverage-parallel.png differ diff --git a/docs/assets/coverage-single.png b/docs/assets/coverage-single.png new file mode 100644 index 00000000..5f58e701 Binary files /dev/null and b/docs/assets/coverage-single.png differ diff --git a/docs/features.md b/docs/features.md index 10c16d26..d69f2499 100644 --- a/docs/features.md +++ b/docs/features.md @@ -10,7 +10,7 @@ | cluster | :white_check_mark: | | :white_check_mark: | :white_check_mark: | | | | [merge](api.md#polars_bio.merge) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | | :white_check_mark: | | complement | :white_check_mark: | :construction: | | :white_check_mark: | :white_check_mark: | | -| coverage | :white_check_mark: | | :white_check_mark: | :white_check_mark: | | :white_check_mark: | +| [coverage](api.md#polars_bio.coverage) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | | :white_check_mark: | | [expand](api.md#polars_bio.LazyFrame.expand) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | | :white_check_mark: | | [sort](api.md#polars_bio.LazyFrame.sort_bedframe) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | | :white_check_mark: | | [read_table](api.md#polars_bio.read_table) | :white_check_mark: | :white_check_mark: | :white_check_mark: | :white_check_mark: | | :white_check_mark: | diff --git a/docs/notebooks/cookbook.ipynb b/docs/notebooks/cookbook.ipynb index 9a7a41c9..007cf002 100644 --- a/docs/notebooks/cookbook.ipynb +++ b/docs/notebooks/cookbook.ipynb @@ -21,24 +21,16 @@ "id": "62a7b57c30bf54e2", "metadata": { "ExecuteTime": { - "end_time": "2025-03-05T16:41:54.268168Z", - "start_time": "2025-03-05T16:41:53.664194Z" + "end_time": "2025-03-07T10:02:23.527490Z", + "start_time": "2025-03-07T10:02:23.525921Z" } }, "source": [ "import polars_bio as pb\n", "import polars as pl" ], - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO:polars_bio:Creating BioSessionContext\n" - ] - } - ], - "execution_count": 2 + "outputs": [], + "execution_count": 8 }, { "cell_type": "code", @@ -646,34 +638,34 @@ { "metadata": { "ExecuteTime": { - "end_time": "2025-02-28T11:52:12.169029Z", - "start_time": "2025-02-28T11:52:12.167384Z" + "end_time": "2025-03-07T10:02:16.509828Z", + "start_time": "2025-03-07T10:02:16.507744Z" } }, "cell_type": "code", "source": "gcs_vcf_path = \"gs://genomics-public-data/platinum-genomes/vcf/NA12878_S1.genome.vcf\"", "id": "31f0f3d0974245bd", "outputs": [], - "execution_count": 16 + "execution_count": 5 }, { "metadata": { "ExecuteTime": { - "end_time": "2025-02-28T11:52:13.441345Z", - "start_time": "2025-02-28T11:52:13.439461Z" + "end_time": "2025-03-07T10:02:19.374881Z", + "start_time": "2025-03-07T10:02:19.372753Z" } }, "cell_type": "code", "source": "info_fields=[\"AC\", \"AF\"]", "id": "816c419b3b45ee44", "outputs": [], - "execution_count": 17 + "execution_count": 6 }, { "metadata": { "ExecuteTime": { - "end_time": "2025-02-28T11:52:17.666747Z", - "start_time": "2025-02-28T11:52:16.365292Z" + "end_time": "2025-03-07T10:02:52.904221Z", + "start_time": "2025-03-07T10:02:41.364349Z" } }, "cell_type": "code", @@ -713,12 +705,12 @@ "shape: (3, 10)
chromstartendidrefaltqualfilteracaf
stru32u32strstrstrf64strlist[i32]list[f32]
"chrM"11"""G"""0.0"PASS"nullnull
"chrM"272"""A"""0.0"PASS"nullnull
"chrM"7373"""G""A"8752.780273"TruthSensitivityTranche99.90to…[2][1.0]
" ] }, - "execution_count": 18, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], - "execution_count": 18 + "execution_count": 9 }, { "metadata": {}, diff --git a/docs/performance.md b/docs/performance.md index a8e86743..50dde7a9 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -7,12 +7,16 @@ ![count-overlaps-single.png](assets/count-overlaps-single.png) +![coverage-single.png](assets/coverage-single.png) + ## Parallel performance πŸƒβ€πŸƒβ€ ![overlap-parallel.png](assets/overlap-parallel.png) ![overlap-parallel.png](assets/nearest-parallel.png) ![count-overlaps-parallel.png](assets/count-overlaps-parallel.png) + +![coverage-parallel.png](assets/coverage-parallel.png) ## Benchmarks πŸ§ͺ ### Detailed results shortcuts πŸ‘¨β€πŸ”¬ - [Binary operations](#binary-operations) diff --git a/polars_bio/__init__.py b/polars_bio/__init__.py index 4b172b3a..01d66d58 100644 --- a/polars_bio/__init__.py +++ b/polars_bio/__init__.py @@ -14,7 +14,7 @@ sql, ) from .polars_ext import PolarsRangesOperations as LazyFrame -from .range_op import FilterOp, count_overlaps, merge, nearest, overlap +from .range_op import FilterOp, count_overlaps, coverage, merge, nearest, overlap from .range_viz import visualize_intervals POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions" @@ -26,6 +26,7 @@ "nearest", "merge", "count_overlaps", + "coverage", "ctx", "FilterOp", "visualize_intervals", diff --git a/polars_bio/polars_ext.py b/polars_bio/polars_ext.py index 9d8f11ad..2c4f29ff 100644 --- a/polars_bio/polars_ext.py +++ b/polars_bio/polars_ext.py @@ -222,3 +222,18 @@ def expand( if midsk in schema: df = df.drop(midsk) return df + + def coverage( + self, + other_df: pl.LazyFrame, + cols1=["chrom", "start", "end"], + cols2=["chrom", "start", "end"], + suffixes: tuple[str, str] = ("_1", "_2"), + ) -> pl.LazyFrame: + """ + !!! note + Alias for [coverage](api.md#polars_bio.coverage) + """ + return pb.coverage( + self._ldf, other_df, cols1=cols1, cols2=cols2, suffixes=suffixes + ) diff --git a/polars_bio/range_op.py b/polars_bio/range_op.py index 48d123d4..49969954 100644 --- a/polars_bio/range_op.py +++ b/polars_bio/range_op.py @@ -127,7 +127,7 @@ def nearest( read_options: Union[ReadOptions, None] = None, ) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame]: """ - Find pairs of overlapping genomic intervals. + Find pairs of closest genomic intervals. Bioframe inspired API. Parameters: @@ -173,6 +173,65 @@ def nearest( return range_operation(df1, df2, range_options, output_type, ctx, read_options) +def coverage( + df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + overlap_filter: FilterOp = FilterOp.Strict, + suffixes: tuple[str, str] = ("_1", "_2"), + on_cols: Union[list[str], None] = None, + cols1: Union[list[str], None] = ["chrom", "start", "end"], + cols2: Union[list[str], None] = ["chrom", "start", "end"], + output_type: str = "polars.LazyFrame", + streaming: bool = False, + read_options: Union[ReadOptions, None] = None, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame]: + """ + Calculate intervals coverage. + Bioframe inspired API. + + Parameters: + df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. + df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). + cols1: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + cols2: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + suffixes: Suffixes for the columns of the two overlapped sets. + on_cols: List of additional column names to join on. default is None. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" are also supported. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming-out-of-core-processing) engine. + read_options: Additional options for reading the input files. + + + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Note: + The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. + This enables efficient processing of large datasets without loading the entire output dataset into memory. + + Example: + + Todo: + Support for on_cols. + """ + + _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") + + cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 + cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 + range_options = RangeOptions( + range_op=RangeOp.Coverage, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=cols1, + columns_2=cols2, + streaming=streaming, + ) + return range_operation(df2, df1, range_options, output_type, ctx, read_options) + + def count_overlaps( df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], diff --git a/polars_bio/range_op_helpers.py b/polars_bio/range_op_helpers.py index a59d899e..7f0261d9 100644 --- a/polars_bio/range_op_helpers.py +++ b/polars_bio/range_op_helpers.py @@ -58,6 +58,10 @@ def range_operation( merged_schema = pl.Schema( {**_get_schema(df1, ctx, None, read_options1), **{"count": pl.Int32}} ) + elif range_options.range_op == RangeOp.Coverage: + merged_schema = pl.Schema( + {**_get_schema(df1, ctx, None, read_options1), **{"coverage": pl.Int32}} + ) else: df_schema1 = _get_schema(df1, ctx, range_options.suffixes[0], read_options1) df_schema2 = _get_schema(df2, ctx, range_options.suffixes[1], read_options2) diff --git a/pyproject.toml b/pyproject.toml index d680f57d..b44e1b11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "polars-bio" -version = "0.7.4" +version = "0.8.0" description = "Blazing fast genomic operations on large Python dataframes" authors = [] requires-python = ">=3.9" diff --git a/src/operation.rs b/src/operation.rs index 70190a78..13ff545f 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -80,11 +80,19 @@ pub(crate) fn do_range_operation( left_table, right_table, )), - RangeOp::CountOverlapsNaive => rt.block_on(do_count_overlaps_naive( + RangeOp::CountOverlapsNaive => rt.block_on(do_count_overlaps_coverage_naive( ctx, range_options, left_table, right_table, + false, + )), + RangeOp::Coverage => rt.block_on(do_count_overlaps_coverage_naive( + ctx, + range_options, + left_table, + right_table, + true, )), _ => panic!("Unsupported operation"), @@ -145,11 +153,12 @@ async fn do_count_overlaps( ctx.sql(&query).await.unwrap() } -async fn do_count_overlaps_naive( +async fn do_count_overlaps_coverage_naive( ctx: &ExonSession, range_opts: RangeOptions, left_table: String, right_table: String, + coverage: bool, ) -> datafusion::dataframe::DataFrame { let columns_1 = range_opts.columns_1.unwrap(); let columns_2 = range_opts.columns_2.unwrap(); @@ -170,13 +179,14 @@ async fn do_count_overlaps_naive( columns_1, columns_2, range_opts.filter_op.unwrap(), - false, + coverage, ); - session.deregister_table("count_overlaps").unwrap(); + let table_name = "count_overlaps_coverage".to_string(); + session.deregister_table(table_name.clone()).unwrap(); session - .register_table("count_overlaps", Arc::new(count_overlaps_provider)) + .register_table(table_name.clone(), Arc::new(count_overlaps_provider)) .unwrap(); - let query = "SELECT * FROM count_overlaps"; + let query = format!("SELECT * FROM {}", table_name); debug!("Query: {}", query); ctx.sql(&query).await.unwrap() } diff --git a/src/udtf.rs b/src/udtf.rs index badacff6..dd627421 100644 --- a/src/udtf.rs +++ b/src/udtf.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::cmp::{max, min}; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -53,7 +54,8 @@ impl CountOverlapsProvider { right_table, schema: { let mut fields = right_table_schema.fields().to_vec(); - let new_field = Field::new("count", DataType::Int64, false); + let name = if coverage { "coverage" } else { "count" }; + let new_field = Field::new(name, DataType::Int64, false); fields.push(FieldRef::new(new_field)); let new_schema = Arc::new(Schema::new(fields).clone()); SchemaRef::from(new_schema.clone()) @@ -117,6 +119,7 @@ impl TableProvider for CountOverlapsProvider { let trees = Arc::new(build_coitree_from_batches( left_table, self.columns_1.clone(), + self.coverage, )); Ok(Arc::new(CountOverlapsExec { schema: self.schema().clone(), @@ -212,9 +215,40 @@ impl ExecutionPlan for CountOverlapsExec { type IntervalHashMap = FnvHashMap>>; +fn merge_intervals(mut intervals: Vec>) -> Vec> { + // Return early if there are no intervals. + if intervals.is_empty() { + return vec![]; + } + + // Sort intervals by their start time. + intervals.sort_by(|a, b| a.first.cmp(&b.first)); + + // Initialize merged intervals with the first interval. + let mut merged = Vec::new(); + let mut current = intervals[0]; + + // Iterate over the rest of the intervals. + for interval in intervals.into_iter().skip(1) { + if interval.first <= current.last { + // Overlapping intervals; merge them by extending the current interval. + current.last = current.last.max(interval.last); + } else { + // No overlap: push the current interval and update it. + merged.push(current); + current = interval; + } + } + // Push the last interval. + merged.push(current); + + merged +} + fn build_coitree_from_batches( batches: Vec, columns: (String, String, String), + coverage: bool, ) -> FnvHashMap> { let mut nodes = IntervalHashMap::default(); @@ -235,7 +269,11 @@ fn build_coitree_from_batches( } let mut trees = FnvHashMap::>::default(); for (seqname, seqname_nodes) in nodes { - trees.insert(seqname, COITree::new(&seqname_nodes)); + if !coverage { + trees.insert(seqname, COITree::new(&seqname_nodes)); + } else { + trees.insert(seqname, COITree::new(&merge_intervals(seqname_nodes))); + } } trees } @@ -349,51 +387,17 @@ fn get_join_col_arrays( _ => todo!(), }; - //Utf8View -> StringViewArray - //LargeUtf8 -> LargeStringArray - // println!("{:?}", batch.schema()); - // let contig_arr = batch - // .column_by_name(&columns.0) - // .unwrap() - // .as_any() - // .downcast_ref::>() - // .unwrap(); - // let start_arr = batch - // .column_by_name(&columns.1) - // .unwrap() - // .as_any() - // .downcast_ref::() - // .unwrap(); - // let end_arr = batch - // .column_by_name(&columns.2) - // .unwrap() - // .as_any() - // .downcast_ref::() - // .unwrap(); (contig_arr, start_arr, end_arr) } -// fn get_coverage(tree: &COITree<(), u32>, start: i32, end: i32) -> i64 { -// let mut max_coverage = 0; -// let start = start + 1; -// let end = end - 1; -// // tree.query(start, end, |node| -// // { -// // if end < node.last() { -// // let coverage = node.last() - start; -// // if coverage > max_coverage { -// // max_coverage = coverage; -// // } -// // } -// // else { -// // let coverage = end - start; -// // if coverage > max_coverage { -// // max_coverage = coverage; -// // } -// // } -// // }); -// max_coverage -// } +fn get_coverage(tree: &COITree<(), u32>, start: i32, end: i32) -> i32 { + let mut coverage = 0; + tree.query(start, end, |node| { + let overlap = max(1, min(end + 1, node.last) - max(start - 1, node.first)); + coverage += overlap; + }); + coverage +} async fn get_stream( session: Arc, @@ -432,12 +436,18 @@ async fn get_stream( continue; } let count = match coverage { - true => todo!("coverage"), + true => { + if filter_op == FilterOp::Strict { + get_coverage(tree.unwrap(), pos_start + 1, pos_end - 1) + } else { + get_coverage(tree.unwrap(), pos_start, pos_end) + } + }, false => { if filter_op == FilterOp::Strict { - tree.unwrap().query_count(pos_start + 1, pos_end - 1) + tree.unwrap().query_count(pos_start + 1, pos_end - 1) as i32 } else { - tree.unwrap().query_count(pos_start, pos_end) + tree.unwrap().query_count(pos_start, pos_end) as i32 } }, }; diff --git a/tests/test_bioframe.py b/tests/test_bioframe.py index 757cab96..7b791b5d 100644 --- a/tests/test_bioframe.py +++ b/tests/test_bioframe.py @@ -170,3 +170,45 @@ def test_merge_schema_rows(self): by=list(self.result_merge.columns) ).reset_index(drop=True) pd.testing.assert_frame_equal(result, expected) + + def test_coverage_count(self): + result = pb.coverage( + BIO_PD_DF1, + BIO_PD_DF2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + output_type="pandas.DataFrame", + overlap_filter=FilterOp.Strict, + ) + result_bio = bf.coverage( + BIO_PD_DF1, + BIO_PD_DF2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + suffixes=("_1", "_2"), + ) + assert len(result) == len(result_bio) + + def test_coverage_schema_rows(self): + result = pb.coverage( + BIO_PD_DF1, + BIO_PD_DF2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + output_type="pandas.DataFrame", + overlap_filter=FilterOp.Strict, + ) + result_bio = bf.coverage( + BIO_PD_DF1, + BIO_PD_DF2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + suffixes=("_1", "_2"), + ) + expected = ( + result_bio.sort_values(by=list(result.columns)) + .reset_index(drop=True) + .astype({"coverage": "int64"}) + ) + result = result.sort_values(by=list(result.columns)).reset_index(drop=True) + pd.testing.assert_frame_equal(result, expected) diff --git a/tests/test_native.py b/tests/test_native.py index dadcddb8..d4b1c397 100644 --- a/tests/test_native.py +++ b/tests/test_native.py @@ -1,5 +1,10 @@ +import bioframe as bf import pandas as pd from _expected import ( + BIO_DF_PATH1, + BIO_DF_PATH2, + BIO_PD_DF1, + BIO_PD_DF2, DF_COUNT_OVERLAPS_PATH1, DF_COUNT_OVERLAPS_PATH2, DF_MERGE_PATH, @@ -101,3 +106,32 @@ def test_merge_schema_rows(self): ) expected = PD_DF_MERGE pd.testing.assert_frame_equal(result, expected) + + +class TestCoverageNative: + result = pb.coverage( + BIO_DF_PATH1, + BIO_DF_PATH2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + output_type="pandas.DataFrame", + overlap_filter=FilterOp.Strict, + ) + result_bio = bf.coverage( + BIO_PD_DF1, + BIO_PD_DF2, + cols1=("contig", "pos_start", "pos_end"), + cols2=("contig", "pos_start", "pos_end"), + suffixes=("_1", "_2"), + ) + + def test_coverage_count(self): + print(self.result) + assert len(self.result) == len(self.result_bio) + + def test_coverage_schema_rows(self): + result = self.result.sort_values(by=list(self.result.columns)).reset_index( + drop=True + ) + expected = self.result_bio.astype({"coverage": "int64"}) + pd.testing.assert_frame_equal(result, expected)