Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 105 additions & 227 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ members = [ "datafusion/bio-format-bam", "datafusion/bio-format-bed",


[workspace.dependencies]
datafusion = {version = "48.0.1"}
datafusion-execution = "48.0.1"
datafusion = {version = "49.0.2"}
datafusion-execution = "49.0.2"
async-trait = "0.1.85"
opendal = { version = "0.53.3", features = ["services-gcs", "services-s3","layers-blocking", "services-azblob", "services-http"] }
opendal = { version = "0.53.3", features = ["services-gcs", "services-s3","layers-blocking", "services-azblob", "services-http"] , default-features = false}
noodles = { version = "0.93.0", features = ["bam", "vcf", "bgzf", "async"]}

noodles-bgzf = { version = "0.36.0",features = ["libdeflate"] }
Expand Down
22 changes: 22 additions & 0 deletions datafusion/bio-format-core/src/bin/test_compression_detection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use datafusion_bio_format_core::object_storage::{
CompressionType, ObjectStorageOptions, get_compression_type,
};

#[tokio::main]
async fn main() {
let file_path =
"gs://gcp-public-data--gnomad/release/4.1/genome_sv/gnomad.v4.1.sv.sites.vcf.gz"
.to_string();
let options = ObjectStorageOptions::default();

println!("Testing compression detection for: {}", file_path);
println!("Using options: {}", options);

match get_compression_type(file_path.clone(), None, options).await {
Ok(CompressionType::GZIP) => println!("✅ Success! Detected GZIP compression"),
Ok(CompressionType::BGZF) => println!("✅ Success! Detected BGZF compression"),
Ok(CompressionType::NONE) => println!("✅ Success! No compression detected"),
Ok(CompressionType::AUTO) => println!("❌ Unexpected AUTO returned"),
Err(e) => println!("❌ Error: {}", e),
}
}
135 changes: 135 additions & 0 deletions datafusion/bio-format-core/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,141 @@ fn is_azure_blob_url(url_str: &str) -> bool {
}
false
}
/// Get remote stream with arbitrary byte range support (start..end)
pub async fn get_remote_stream_with_range(
file_path: String,
object_storage_options: ObjectStorageOptions,
start: u64,
end: u64,
) -> Result<FuturesBytesStream, opendal::Error> {
let storage_type = get_storage_type(file_path.clone());
let bucket_name = get_bucket_name(file_path.clone());
let relative_file_path = get_file_path(file_path.clone());
let chunk_size = object_storage_options.clone().chunk_size.unwrap_or(64);
let concurrent_fetches = object_storage_options
.clone()
.concurrent_fetches
.unwrap_or(8);
let allow_anonymous = object_storage_options.allow_anonymous;
let enable_request_payer = object_storage_options.enable_request_payer;
let max_retries = object_storage_options.max_retries.unwrap_or(5);
let timeout = object_storage_options.timeout.unwrap_or(300);

match storage_type {
StorageType::S3 => {
log::info!(
"Using S3 storage type with range {}..{} for file: {}",
start,
end,
relative_file_path
);
let mut builder = S3::default()
.region(
&env::var("AWS_REGION").unwrap_or(
env::var("AWS_DEFAULT_REGION").unwrap_or(
S3::detect_region("https://s3.amazonaws.com", bucket_name.as_str())
.await
.unwrap_or("us-east-1".to_string()),
),
),
)
.bucket(bucket_name.as_str())
.endpoint(&env::var("AWS_ENDPOINT_URL").unwrap_or_default());
if allow_anonymous {
builder = builder.disable_ec2_metadata().allow_anonymous();
};
if enable_request_payer {
builder = builder.enable_request_payer();
}
let operator = Operator::new(builder)?
.layer(
TimeoutLayer::new()
.with_io_timeout(std::time::Duration::from_secs(timeout as u64)),
)
.layer(RetryLayer::new().with_max_times(max_retries))
.layer(LoggingLayer::default())
.finish();

operator
.reader_with(relative_file_path.as_str())
.concurrent(1)
.await?
.into_bytes_stream(start..end)
.await
}
StorageType::AZBLOB => {
let blob_info = extract_account_and_container(&*file_path.clone());
log::info!(
"Using Azure Blob Storage with range {}..{} for file: {}",
start,
end,
blob_info.relative_path
);

let builder = Azblob::default()
.root("/")
.container(&blob_info.container)
.endpoint(&blob_info.endpoint)
.account_name(&env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_default())
.account_key(&env::var("AZURE_STORAGE_KEY").unwrap_or_default());
let operator = Operator::new(builder)?
.layer(
TimeoutLayer::new()
.with_io_timeout(std::time::Duration::from_secs(timeout as u64)),
)
.layer(RetryLayer::new().with_max_times(max_retries))
.layer(LoggingLayer::default())
.finish();

operator
.reader_with(blob_info.relative_path.as_str())
.chunk(chunk_size * 1024 * 1024)
.concurrent(1)
.await?
.into_bytes_stream(start..end)
.await
}
StorageType::GCS => {
log::info!(
"Using GCS storage with range {}..{} for file: {}",
start,
end,
relative_file_path
);
let mut builder = Gcs::default().bucket(bucket_name.as_str());
if allow_anonymous {
builder = builder.disable_vm_metadata().allow_anonymous();
} else {
if let Ok(service_account_key) = env::var("GOOGLE_APPLICATION_CREDENTIALS") {
builder = builder.credential_path(service_account_key.as_str());
} else {
log::warn!(
"GOOGLE_APPLICATION_CREDENTIALS environment variable is not set. Using default credentials."
);
}
};
let operator = Operator::new(builder)?
.layer(
TimeoutLayer::new()
.with_io_timeout(std::time::Duration::from_secs(timeout as u64)),
)
.layer(RetryLayer::new().with_max_times(max_retries))
.layer(LoggingLayer::default())
.finish();

operator
.reader_with(relative_file_path.as_str())
.chunk(chunk_size * 1024 * 1024)
.concurrent(concurrent_fetches)
.await?
.into_bytes_stream(start..end)
.await
}
StorageType::HTTP => unimplemented!("HTTP storage type is not implemented yet"),
StorageType::LOCAL => unreachable!("LOCAL storage should not use remote stream"),
}
}

pub async fn get_remote_stream(
file_path: String,
object_storage_options: ObjectStorageOptions,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/bio-format-fastq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ async-trait = "0.1.88"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dev-dependencies]
tempfile = "3"


[[example]]
name = "test_reader"
Expand Down
6 changes: 3 additions & 3 deletions datafusion/bio-format-fastq/src/bgzf_parallel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl BgzfFastqTableProvider {
}
}

fn get_bgzf_partition_bounds(index: &gzi::Index, thread_num: usize) -> Vec<(u64, u64)> {
pub fn get_bgzf_partition_bounds(index: &gzi::Index, thread_num: usize) -> Vec<(u64, u64)> {
let mut block_offsets: Vec<(u64, u64)> = index.as_ref().iter().map(|(c, u)| (*c, *u)).collect();
block_offsets.insert(0, (0, 0));

Expand Down Expand Up @@ -178,7 +178,7 @@ fn find_line_end(buf: &[u8], start: usize) -> Option<usize> {
.map(|pos| start + pos)
}

fn synchronize_reader<R: BufRead>(reader: &mut IndexedReader<R>, end_comp: u64) -> io::Result<()> {
pub fn synchronize_reader<R: BufRead>(reader: &mut IndexedReader<R>, end_comp: u64) -> io::Result<()> {
// DO NOT perform an initial read_until, as it can discard a valid header
// if the initial seek lands exactly on the start of a line.
// The loop below is capable of handling any starting position.
Expand Down Expand Up @@ -430,7 +430,7 @@ impl ExecutionPlan for BgzfFastqExec {
self.schema(),
rx.map(move |(item, count)| {
debug!("Partition {}: processed {} rows", partition, count);
item.map_err(|e| DataFusionError::ArrowError(e, None))
item.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}),
)))
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/bio-format-fastq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod physical_exec;
pub mod storage;
pub mod table_provider;

pub use bgzf_parallel_reader::BgzfFastqTableProvider;
pub use bgzf_parallel_reader::{BgzfFastqTableProvider, get_bgzf_partition_bounds, synchronize_reader};
21 changes: 17 additions & 4 deletions datafusion/bio-format-fastq/src/physical_exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::storage::{FastqLocalReader, FastqRemoteReader};
use crate::table_provider::FastqByteRange;
use async_stream::__private::AsyncStream;
use async_stream::try_stream;
use datafusion::arrow::array::{Array, NullArray, RecordBatch, StringArray, StringBuilder};
Expand All @@ -24,6 +25,7 @@ pub struct FastqExec {
pub(crate) projection: Option<Vec<usize>>,
pub(crate) cache: PlanProperties,
pub(crate) limit: Option<usize>,
pub(crate) byte_range: Option<FastqByteRange>,
pub(crate) thread_num: Option<usize>,
pub(crate) object_storage_options: Option<ObjectStorageOptions>,
}
Expand Down Expand Up @@ -79,6 +81,7 @@ impl ExecutionPlan for FastqExec {
batch_size,
self.thread_num,
self.projection.clone(),
self.byte_range.clone(),
self.object_storage_options.clone(),
);
let stream = futures::stream::once(fut).try_flatten();
Expand All @@ -91,12 +94,17 @@ async fn get_remote_fastq_stream(
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<usize>>,
byte_range: Option<FastqByteRange>,
object_storage_options: Option<ObjectStorageOptions>,
) -> datafusion::error::Result<
AsyncStream<datafusion::error::Result<RecordBatch>, impl Future<Output = ()> + Sized>,
> {
let mut reader =
FastqRemoteReader::new(file_path.clone(), object_storage_options.unwrap()).await?;
let mut reader = FastqRemoteReader::new_with_range(
file_path.clone(),
byte_range.clone(),
object_storage_options.unwrap(),
)
.await?;

// Determine which fields we need to parse based on projection
let needs_name = projection.as_ref().map_or(true, |proj| proj.contains(&0));
Expand Down Expand Up @@ -193,6 +201,7 @@ async fn get_local_fastq(
batch_size: usize,
thread_num: Option<usize>,
projection: Option<Vec<usize>>,
byte_range: Option<FastqByteRange>,
object_storage_options: Option<ObjectStorageOptions>,
) -> datafusion::error::Result<impl futures::Stream<Item = datafusion::error::Result<RecordBatch>>>
{
Expand Down Expand Up @@ -226,10 +235,11 @@ async fn get_local_fastq(
let mut batch_num = 0;
let file_path = file_path.clone();
let thread_num = thread_num.unwrap_or(1);
let mut reader = FastqLocalReader::new(
let mut reader = FastqLocalReader::new_with_range(
file_path.clone(),
thread_num,
object_storage_options.unwrap(),
byte_range.clone(),
object_storage_options.unwrap_or(ObjectStorageOptions::default()),
)
.await?;
let mut record_num = 0;
Expand Down Expand Up @@ -397,6 +407,7 @@ async fn get_stream(
batch_size: usize,
thread_num: Option<usize>,
projection: Option<Vec<usize>>,
byte_range: Option<FastqByteRange>,
object_storage_options: Option<ObjectStorageOptions>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
// Open the BGZF-indexed VCF using IndexedReader.
Expand All @@ -413,6 +424,7 @@ async fn get_stream(
batch_size,
thread_num,
projection,
byte_range,
object_storage_options,
)
.await?;
Expand All @@ -424,6 +436,7 @@ async fn get_stream(
schema.clone(),
batch_size,
projection,
byte_range,
object_storage_options,
)
.await?;
Expand Down
Loading
Loading