Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ log = "0.4.22"
tracing = { version = "0.1.41", features = ["log"] }
futures-util = "0.3.31"

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"


polars = { git = "https://github.com/mwiewior/polars.git" , rev = "9d4fca54b1d71fce08a51cf00a88f67c67313706", features = ["dtype-full"]}
Expand Down
Binary file added benchmark_results/fastqc_test.webp
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added benchmark_results/performance_k_comparison.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
172 changes: 172 additions & 0 deletions kmer_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import time
import os
import subprocess
import tempfile
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from polars_bio.io import read_fastq
from polars_bio.kmer_analysis import kmer_count, visualize_kmers


SMALL_FASTQ_PATH = "tests/data/io/fastq/example.fastq"
LARGE_FASTQ_PATH = "tests/data/io/fastq/ERR194147.fastq"
OUTPUT_DIR = "benchmark_results"
FASTQC_RS_OUTPUT = "tests/data/io/fastq/output_big.json"
FASTQC_RS_OUTPUT_K3 = "tests/data/io/fastq/output.json"


if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)

# ==================== SEKCJA 1: POMOCNICZE FUNKCJE ====================

def run_fastqc_rs(fastq_path, k, output_json=None):

try:
start_time = time.time()
subprocess.run(
["fqc", "-q", fastq_path, "-k", str(k)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True
)
execution_time = time.time() - start_time

return execution_time
except Exception as e:
print(f"Błąd fastqc-rs: {e}")
return None, 0

def load_fastqc_rs_results(result_path):

try:
with open(result_path) as f:
return json.load(f)["values"]
except Exception as e:
print(f"Błąd podczas wczytywania wyników fastqc-rs: {e}")
return None


# ==================== SEKCJA 3: TEST WYDAJNOŚCI ====================

def performance_test(fastq_paths=None, k_values=None, include_fastqc_rs=True):

print("\n=== Test wydajności ===")

fastq_paths = fastq_paths or [SMALL_FASTQ_PATH, LARGE_FASTQ_PATH]
k_values = k_values or [3, 5]

results = []

for path in fastq_paths:
path_name = Path(path).name
print(f"\nTestowanie pliku: {path_name}")

df = read_fastq(path)

for k in k_values:
print(f" k={k}")

# Test własnej implementacji
start_time = time.time()
kmer_count(k=k, df=df)
polars_time = time.time() - start_time

results.append({
'implementation': 'polars-bio',
'file': path_name,
'k': k,
'time': polars_time
})

# Test fastqc-rs
if include_fastqc_rs:
fastqc_time = run_fastqc_rs(path, k)
print(fastqc_time)
if fastqc_time > 0:
results.append({
'implementation': 'fastqc-rs',
'file': path_name,
'k': k,
'time': fastqc_time
})

return pd.DataFrame(results)


def visualize_performance_results(results, save_path=None):

sns.set_style("whitegrid")

files = results['file'].unique()

fig, axes = plt.subplots(1, len(files), figsize=(14, 6), sharey=False)

if len(files) == 1:
axes = [axes]

for i, file in enumerate(files):
file_data = results[results['file'] == file]

sns.barplot(
data=file_data,
x="k",
y="time",
hue="implementation",
palette=["royalblue", "firebrick"],
ax=axes[i]
)

axes[i].set_title(file)
axes[i].set_xlabel("Wartość k")
axes[i].set_ylabel("Czas wykonania (s)")

plt.tight_layout()
fig.suptitle("Porównanie wydajności dla różnych wartości k", y=1.02, fontsize=16)

handles, labels = axes[-1].get_legend_handles_labels()
[ax.get_legend().remove() for ax in axes if ax.get_legend()]
fig.legend(handles, labels, loc='upper center', bbox_to_anchor=(0.5, 0),
ncol=2, frameon=True, title="Implementation")

if save_path:
plt.savefig(f"{save_path}_k_comparison.png", bbox_inches='tight')
else:
plt.show()

# ==================== SEKCJA 5: GŁÓWNA FUNKCJA ====================

def get_kmer_results(fastq_path, k):

df = read_fastq(fastq_path)
kmer_df = kmer_count(k=k, df=df)
return kmer_df


def main():
print("\nRozpoczynam testy wydajności...")
performance_results = performance_test(
fastq_paths=[SMALL_FASTQ_PATH, LARGE_FASTQ_PATH],
k_values=[3],
include_fastqc_rs=True
)

results_path = os.path.join(OUTPUT_DIR, "performance_results.csv")
performance_results.to_csv(results_path, index=False)
print(f"Zapisano wyniki wydajności do {results_path}")

print("\nGeneruję wykresy wydajności...")
visualize_performance_results(
performance_results,
save_path=os.path.join(OUTPUT_DIR, "performance")
)


if __name__ == "__main__":
main()
Binary file added output_big.html
Binary file not shown.
9 changes: 9 additions & 0 deletions polars_bio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from polars_bio.polars_bio import BioSessionContext
from polars_bio.polars_bio import InputFormat
from bioframe import count_overlaps

from polars_bio.polars_bio import GffReadOptions, InputFormat
Expand Down Expand Up @@ -42,11 +44,15 @@

from .io import IOOperations as data_input
from .polars_ext import PolarsRangesOperations as LazyFrame
from .range_op import FilterOp, count_overlaps, coverage, merge, nearest, overlap
from .range_viz import visualize_intervals
from .kmer_analysis import kmer_count, visualize_kmers
from .range_op import FilterOp
from .range_op import IntervalOperations as range_operations
from .range_utils import Utils as utils
from .sql import SQL as data_processing


POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions"


Expand All @@ -64,4 +70,7 @@
"VcfReadOptions",
"ObjectStorageOptions",
"set_option",
"kmer_count",
"visualize_kmers",
"BioSessionContext",
]
59 changes: 59 additions & 0 deletions polars_bio/kmer_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
import matplotlib.pyplot as plt
import pandas as pd
import polars as pl


import polars_bio
from polars_bio.polars_bio import (
py_kmer_count,
)


def kmer_count(k, df):

assert isinstance(df, (pl.DataFrame, pl.LazyFrame)), "df must be Polars DataFrame or LazyFrame"
assert isinstance(k, int) and k > 0, "k must be a positive integer"

arrow_reader = (
df.to_arrow()
if isinstance(df, pl.DataFrame)
else df.collect().to_arrow().to_reader()
)

ctx = polars_bio.BioSessionContext(seed="seed", catalog_dir=".")
result = py_kmer_count(ctx, k, arrow_reader).collect()

json_str = result[0]["kmer_counts"][0].as_py()
json_data = json.loads(json_str)

rows = [{"kmer": kmer, "count": count} for kmer, count in json_data.items()]
return pd.DataFrame(rows)


def visualize_kmers(df, top_n=None):

assert isinstance(df, pd.DataFrame), "df must be a Pandas DataFrame"
assert "kmer" in df.columns and "count" in df.columns, "DataFrame must contain 'kmer' and 'count' columns"

df = df.sort_values(by='count', ascending=False)
if top_n:
df = df.head(top_n)

df = df[::-1].reset_index(drop=True)

plt.figure(figsize=(10, max(6, 0.3 * len(df))))
bars = plt.barh(range(len(df)), df['count'], color='steelblue', edgecolor='black')
plt.yticks(range(len(df)), df['kmer'])

for i, bar in enumerate(bars):
width = bar.get_width()
plt.text(width + 0.5, bar.get_y() + bar.get_height() / 2,
str(int(width)), va='center', fontsize=9)

plt.xlabel('count')
plt.ylabel('k-mer')
plt.title('k-mer quantities')
plt.grid(axis='x', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
329 changes: 329 additions & 0 deletions sprawozdanie.ipynb

Large diffs are not rendered by default.

Binary file added sprawozdanie.pdf
Binary file not shown.
100 changes: 100 additions & 0 deletions src/kmer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use arrow_array::{ArrayRef, StringArray};
use datafusion::common::{ScalarValue};
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::Accumulator;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use arrow_array::Array;

#[derive(Debug, Serialize, Deserialize)]
pub struct KmerCounter {
kmer_len: usize,
frequency: HashMap<String, usize>,
}

impl KmerCounter {
pub fn with_k(k: usize) -> Self {
KmerCounter {
kmer_len: k,
frequency: HashMap::new(),
}
}

fn count_kmers(&mut self, sequence: &str) {
let len = sequence.len();
if len < self.kmer_len {
return;
}

for i in 0..=(len - self.kmer_len) {
let candidate = &sequence[i..i + self.kmer_len];
if candidate.contains('N') {
continue;
}
*self.frequency.entry(candidate.to_owned()).or_insert(0) += 1;
}
}

fn merge_map(&mut self, other: HashMap<String, usize>) {
for (kmer, count) in other {
*self.frequency.entry(kmer).or_insert(0) += count;
}
}
}

impl Accumulator for KmerCounter {
fn update_batch(&mut self, inputs: &[ArrayRef]) -> Result<()> {
let input_array = inputs.get(0)
.ok_or_else(|| DataFusionError::Execution("No input array provided".into()))?;

let strings = input_array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DataFusionError::Execution("Expected StringArray input".into()))?;

for i in 0..strings.len() {
if strings.is_valid(i) {
let seq = strings.value(i);
self.count_kmers(seq);
}
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let state_array = states.get(0)
.ok_or_else(|| DataFusionError::Execution("No state array provided".into()))?;

let string_array = state_array
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DataFusionError::Execution("Expected StringArray in state merge".into()))?;

for i in 0..string_array.len() {
if string_array.is_valid(i) {
let json = string_array.value(i);
let parsed: HashMap<String, usize> = serde_json::from_str(json)
.map_err(|e| DataFusionError::Execution(format!("JSON parsing error: {}", e)))?;
self.merge_map(parsed);
}
}

Ok(())
}

fn state(&mut self) -> Result<Vec<ScalarValue>> {
let json = serde_json::to_string(&self.frequency)
.map_err(|e| DataFusionError::Execution(format!("JSON serialization error: {}", e)))?;
Ok(vec![ScalarValue::Utf8(Some(json))])
}

fn evaluate(&mut self) -> Result<ScalarValue> {
let json = serde_json::to_string(&self.frequency)
.map_err(|e| DataFusionError::Execution(format!("JSON serialization error: {}", e)))?;
Ok(ScalarValue::Utf8(Some(json)))
}

fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
Loading