Skip to content

Commit 4d7e72b

Browse files
committed
Describe VCF
1 parent 290df12 commit 4d7e72b

File tree

3 files changed

+127
-8
lines changed

3 files changed

+127
-8
lines changed

datafusion/vcf/examples/datafusion_integration.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
use std::sync::Arc;
2+
use datafusion::arrow::util::pretty::pretty_format_batches;
3+
use datafusion::datasource::MemTable;
24
use datafusion::prelude::SessionContext;
35
use datafusion_vcf::table_provider::VcfTableProvider;
4-
6+
use datafusion_vcf::storage::VcfRemoteReader;
57
#[tokio::main(flavor = "multi_thread")]
68
async fn main() -> datafusion::error::Result<()> {
79
env_logger::init();
8-
// let path = "gs://gcp-public-data--gnomad/release/4.1/vcf/exomes/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
10+
let path = "gs://gcp-public-data--gnomad/release/4.1/vcf/exomes/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
911
// let path = "gs://gcp-public-data--gnomad/release/4.1/genome_sv/gnomad.v4.1.sv.sites.vcf.gz".to_string();
1012
// let path = "gs://genomics-public-data/platinum-genomes/vcf/NA12878_S1.genome.vcf".to_string();
1113
// let path ="/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".to_string();
12-
let path ="/tmp/gnomad.v4.1.sv.sites.vcf.gz".to_string();
14+
// let path ="/tmp/gnomad.v4.1.sv.sites.vcf.gz".to_string();
1315
// let path ="/tmp//NA12878_S1.genome.vcf".to_string();
1416
// let infos = Some(Vec::from(["AC".to_string(), "AF".to_string(), "AN".to_string(), "FS".to_string(), "AN_raw".to_string(), "variant_type".to_string(), "AS_culprit".to_string(), "only_het".to_string()]));
1517
// let infos = Some(Vec::from(["SVTYPE".to_string()]));
@@ -19,13 +21,20 @@ async fn main() -> datafusion::error::Result<()> {
1921
let ctx = SessionContext::new();
2022
ctx.sql("set datafusion.execution.skip_physical_aggregate_schema_check=true").await?;
2123
// let table_provider = VcfTableProvider::new("/tmp/gnomad.exomes.v4.1.sites.chr21.vcf.bgz".parse().unwrap(), vec!["SVTYPE".parse().unwrap()], vec![], Some(8))?;
22-
let table_provider = VcfTableProvider::new(path, infos, None, Some(4), None,None)?;
24+
let table_provider = VcfTableProvider::new(path.clone(), infos, None, Some(4), None,None)?;
2325
ctx.register_table("custom_table", Arc::new(table_provider)).expect("TODO: panic message");
2426
// let df = ctx.sql("SELECT svtype, count(*) as cnt FROM custom_table group by svtype").await?;
25-
let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
27+
// let df = ctx.sql("SELECT count(*) as cnt FROM custom_table").await?;
2628
// df.clone().write_csv("/tmp/gnomad.exomes.v4.1.sites.chr21-old.csv", DataFrameWriteOptions::default(), Some(CsvOptions::default())).await?;
27-
// let df = ctx.sql("SELECT chrom FROM custom_table LIMIT 5").await?;
29+
let df = ctx.sql("SELECT chrom FROM custom_table LIMIT 5").await?;
2830
// println!("{:?}", df.explain(false, false)?);
31+
let mut reader = VcfRemoteReader::new(path, 64, 1).await;
32+
let rb= reader.describe().await?;
33+
// print!("{}", pretty_format_batches(&[rb]).expect("TODO: panic message").to_string());
34+
let mem_table = MemTable::try_new(rb.schema().clone(), vec![vec![rb]])?;
35+
ctx.register_table("my_table", Arc::new(mem_table))?;
36+
let df = ctx.table("my_table").await?;
37+
ctx.deregister_table("my_table")?;
2938
df.show().await.expect("TODO: panic message");
3039
// println!("{:?}", );
3140
Ok(())

datafusion/vcf/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ extern crate core;
22

33
pub mod table_provider;
44
mod physical_exec;
5-
mod storage;
5+
pub mod storage;

datafusion/vcf/src/storage.rs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
use std::fs::File;
22
use std::io::Error;
33
use std::num::NonZero;
4+
use std::sync::Arc;
45
use async_stream::stream;
56
use bytes::Bytes;
7+
use datafusion::arrow;
8+
use datafusion::arrow::array::StringBuilder;
9+
use datafusion::arrow::datatypes::SchemaRef;
10+
use datafusion::arrow::ipc::RecordBatch;
11+
use datafusion::datasource::MemTable;
612
use futures::{stream, StreamExt};
713
use futures::stream::BoxStream;
814
use log::debug;
915
use noodles::{bgzf, vcf};
1016
use noodles::vcf::io::Reader;
11-
use noodles::vcf::Record;
17+
use noodles::vcf::{Header, Record};
1218
use noodles_bgzf::{AsyncReader, MultithreadedReader};
1319
use opendal::{FuturesBytesStream, Operator};
1420
use opendal::layers::{LoggingLayer, RetryLayer, TimeoutLayer};
@@ -271,6 +277,20 @@ impl VcfRemoteReader {
271277
}
272278
}
273279
}
280+
pub async fn describe(&mut self) -> Result<arrow::array::RecordBatch, Error> {
281+
match self {
282+
VcfRemoteReader::BGZF(reader) => {
283+
let header = reader.read_header().await?;
284+
Ok(get_info_fields(&header).await)
285+
}
286+
VcfRemoteReader::PLAIN(reader) => {
287+
let header = reader.read_header().await?;
288+
Ok(get_info_fields(&header).await)
289+
}
290+
}
291+
}
292+
293+
274294

275295
pub async fn read_records(&mut self) -> BoxStream<'_, Result<Record, Error>> {
276296
match self {
@@ -324,4 +344,94 @@ impl VcfLocalReader {
324344
}
325345
}
326346
}
347+
pub async fn describe(&mut self) -> Result<arrow::array::RecordBatch, Error> {
348+
match self {
349+
VcfLocalReader::BGZF(reader) => {
350+
let header = reader.read_header()?;
351+
Ok(get_info_fields(&header).await)
352+
}
353+
VcfLocalReader::PLAIN(reader) => {
354+
let header = reader.read_header().await?;
355+
Ok(get_info_fields(&header).await)
356+
}
357+
}
358+
}
359+
}
360+
361+
pub async fn get_info_fields(header: &Header) -> arrow::array::RecordBatch {
362+
let info_fields = header.infos();
363+
let mut field_names = StringBuilder::new();
364+
let mut field_types = StringBuilder::new();
365+
let mut field_descriptions = StringBuilder::new();
366+
for (field_name, field) in info_fields {
367+
field_names.append_value(field_name.to_lowercase());
368+
field_types.append_value(field.ty().to_string());
369+
field_descriptions.append_value(field.description());
370+
}
371+
// build RecordBatch
372+
let field_names = field_names.finish();
373+
let field_types = field_types.finish();
374+
let field_descriptions = field_descriptions.finish();
375+
let schema = arrow::datatypes::Schema::new(vec![
376+
arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
377+
arrow::datatypes::Field::new("type", arrow::datatypes::DataType::Utf8, false),
378+
arrow::datatypes::Field::new("description", arrow::datatypes::DataType::Utf8, false),
379+
]);
380+
let record_batch = arrow::record_batch::RecordBatch::try_new(
381+
SchemaRef::from(schema.clone()),
382+
vec![Arc::new(field_names),Arc::new(field_types),Arc::new(field_descriptions)]
383+
).unwrap();
384+
record_batch
385+
}
386+
387+
pub enum VcfReader {
388+
Local(VcfLocalReader),
389+
Remote(VcfRemoteReader)
390+
}
391+
392+
impl VcfReader {
393+
394+
pub async fn new(file_path: String, thread_num: Option<usize>, chunk_size: Option<usize>, concurrency_fetches: Option<usize>) -> Self {
395+
let storage_type = get_storage_type(file_path.clone());
396+
match storage_type {
397+
StorageType::LOCAL => {
398+
VcfReader::Local(VcfLocalReader::new(file_path, thread_num.unwrap_or(1)).await)
399+
}
400+
_ => {
401+
VcfReader::Remote(VcfRemoteReader::new(file_path, chunk_size.unwrap_or(64), concurrency_fetches.unwrap_or(8)).await)
402+
}
403+
}
404+
}
405+
406+
pub async fn read_header(&mut self) -> Result<vcf::Header, Error> {
407+
match self {
408+
VcfReader::Local(reader) => {
409+
reader.read_header().await
410+
}
411+
VcfReader::Remote(reader) => {
412+
reader.read_header().await
413+
}
414+
}
415+
}
416+
pub async fn describe(&mut self) -> Result<arrow::array::RecordBatch, Error> {
417+
match self {
418+
VcfReader::Local(reader) => {
419+
reader.describe().await
420+
}
421+
VcfReader::Remote(reader) => {
422+
reader.describe().await
423+
}
424+
}
425+
}
426+
pub async fn read_records(&mut self) -> BoxStream<'_, Result<Record, Error>> {
427+
match self {
428+
VcfReader::Local(reader) => {
429+
reader.read_records()
430+
}
431+
VcfReader::Remote(reader) => {
432+
reader.read_records().await
433+
}
434+
}
435+
}
436+
327437
}

0 commit comments

Comments
 (0)