Skip to content

Commit ed86dff

Browse files
Piotr DębskiPiotr Dębski
authored andcommitted
add unit tests for storage
Signed-off-by: Piotr Dębski <piotrdebski@Piotrs-MacBook-Pro.local>
1 parent cc38b4a commit ed86dff

File tree

5 files changed

+1128
-140
lines changed

5 files changed

+1128
-140
lines changed

datafusion/vcf/examples/noodles_test.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
use std::io;
2-
use std::slice::SliceIndex;
3-
use std::sync::Arc;
41
use bytes::Bytes;
52
use datafusion::arrow::datatypes::DataType::Int64;
6-
use tokio_util::io::{StreamReader, SyncIoBridge};
7-
use noodles_bgzf as bgzf;
8-
use object_store::gcp::GoogleCloudStorageBuilder;
9-
use object_store::ObjectStore;
10-
use object_store::path::Path;
113
use datafusion::error::Result;
124
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
135
use futures::{FutureExt, StreamExt, TryStreamExt};
6+
use noodles_bgzf as bgzf;
7+
use object_store::ObjectStore;
8+
use object_store::gcp::GoogleCloudStorageBuilder;
9+
use object_store::path::Path;
10+
use std::io;
11+
use std::slice::SliceIndex;
12+
use std::sync::Arc;
13+
use tokio_util::io::{StreamReader, SyncIoBridge};
1414
// use noodles_vcf as vcf;
1515
use object_store::buffered::BufReader;
1616
use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeek};
1717

1818
// use opendal::Result;
19+
use log::{info, log};
20+
use noodles::{bam, sam, vcf};
21+
use opendal::Operator;
1922
use opendal::layers::LoggingLayer;
2023
use opendal::services;
21-
use opendal::Operator;
2224
use opendal::services::Gcs;
23-
use noodles::{bam, sam, vcf};
24-
use log::{info, log};
2525
use opendal::services::S3;
2626

2727
// const BUCKET: &str = "gcp-public-data--gnomad";
@@ -40,18 +40,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4040
.layer(LoggingLayer::default())
4141
.finish();
4242

43-
let stream = operator.reader_with(NAME)
43+
let stream = operator
44+
.reader_with(NAME)
4445
.chunk(16 * 1024 * 1024)
4546
.concurrent(2)
46-
.await?.into_bytes_stream(..).await?;
47+
.await?
48+
.into_bytes_stream(..)
49+
.await?;
4750
// let stream = operator.reader_with(NAME)
4851
// .concurrent(8)
4952
// .await?.into_bytes_stream(..).await?;
5053
let inner = bgzf::r#async::Reader::new(StreamReader::new(stream));
5154
let mut reader = vcf::r#async::io::Reader::new(inner);
5255
//
5356
info!("Reading header");
54-
let mut count = 0;
57+
let mut count = 0;
5558
loop {
5659
let record = reader.records().next().await;
5760
if record.is_none() {
@@ -66,20 +69,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6669
Ok(())
6770
}
6871

69-
70-
let builder = Gcs::default()
71-
.bucket(BUCKET)
72-
.disable_vm_metadata()
73-
.allow_anonymous();
72+
// let builder = Gcs::default()
73+
// .bucket(BUCKET)
74+
// .disable_vm_metadata()
75+
// .allow_anonymous();
7476
//
7577
// let operator = Operator::new(builder)?.finish();
7678
//
7779
// let stream = operator.reader(NAME).await?.into_bytes_stream(..).await?;
7880
// let inner = StreamReader::new(stream);
7981

80-
81-
82-
8382
// let stdout = io::stdout();
8483
// let mut writer = sam::r#async::io::Writer::new(stdout);
8584
//
@@ -124,4 +123,4 @@ let builder = Gcs::default()
124123
// header_reader.read_to_string(&mut raw_header).await?;
125124
// println!("{:}", raw_header);
126125
// let mut reader = bgzf::MultithreadedReader::new(stream);
127-
// let _inner = reader.get_mut();
126+
// let _inner = reader.get_mut();

0 commit comments

Comments
 (0)