-
Notifications
You must be signed in to change notification settings - Fork 3
Integration with NiBabel
Zvi Baratz edited this page Oct 29, 2025
·
1 revision
Audience: Neuroimaging researchers and developers
Difficulty: Intermediate
Estimated reading time: 15 minutes
This guide demonstrates how to combine csa_header with NiBabel for comprehensive neuroimaging workflows.
- Why Combine csa_header and NiBabel?
- Installation
- Basic Integration Pattern
- Complete Workflows
- Advanced Examples
- See Also
csa_header and NiBabel serve complementary purposes in neuroimaging pipelines:
| Library | Purpose | Strengths |
|---|---|---|
| csa_header | Extract Siemens metadata | CSA header parsing, ASCCONV protocols, timing parameters |
| NiBabel | Handle imaging data | Load/save neuroimaging formats, affine transforms, image arrays |
flowchart TB
A[Siemens DICOM Files] --> B[pydicom<br/>Read DICOM structure]
A --> C[NiBabel<br/>Load image data]
A --> D[csa_header<br/>Extract CSA metadata]
B --> E{What do you need?}
C --> E
D --> E
E -->|Pixel data| F[NiBabel]
E -->|Affine/header| F
E -->|Slice timing| G[csa_header]
E -->|B-values| G
E -->|Protocol params| G
F --> H[Complete<br/>Neuroimaging<br/>Workflow]
G --> H
style A fill:#e1f5ff
style F fill:#c8e6c9
style G fill:#ffe1b2
style H fill:#e1f5ff
1. Slice Timing Correction
- NiBabel: Load 4D fMRI data
- csa_header: Extract slice acquisition times
- Combined: Apply correction with accurate timing
2. Diffusion Imaging
- NiBabel: Load DWI volumes
- csa_header: Extract b-values and gradients
- Combined: Perform tensor fitting with correct parameters
3. BIDS Conversion
- NiBabel: Convert DICOM → NIfTI
- csa_header: Extract metadata for JSON sidecars
- Combined: Create BIDS-compliant datasets
4. Quality Assurance
- NiBabel: Compute image statistics
- csa_header: Verify acquisition parameters
- Combined: Comprehensive QA pipelines
# Install both libraries
pip install csa_header nibabel pydicom
# Or install with optional dependencies
pip install csa_header[examples] # includes nibabelimport nibabel as nib
import pydicom
from csa_header import CsaHeader
import numpy as np
def load_dicom_with_csa(dicom_path: str):
"""
Load DICOM with both NiBabel and csa_header.
Parameters
----------
dicom_path : str
Path to Siemens DICOM file
Returns
-------
dict
Dictionary with 'image' (nibabel), 'csa_series', 'csa_image'
"""
# Load with NiBabel for image data
nib_img = nib.load(dicom_path)
# Load with pydicom for CSA extraction
dcm = pydicom.dcmread(dicom_path)
# Extract CSA headers
csa_data = {}
if (0x0029, 0x1010) in dcm:
csa_data['series'] = CsaHeader(dcm[0x0029, 0x1010].value).read()
if (0x0029, 0x1020) in dcm:
csa_data['image'] = CsaHeader(dcm[0x0029, 0x1020].value).read()
return {
'nibabel_image': nib_img,
'csa': csa_data,
'dicom': dcm
}
# Usage
data = load_dicom_with_csa('scan.dcm')
# Access image data via NiBabel
image_array = data['nibabel_image'].get_fdata()
affine = data['nibabel_image'].affine
# Access metadata via CSA
if 'series' in data['csa']:
slice_times = data['csa']['series'].get('MosaicRefAcqTimes', {}).get('value', [])
print(f"Image shape: {image_array.shape}")
print(f"Slice timing: {len(slice_times)} time points")class SiemensDicomLoader:
"""Convenience wrapper for loading Siemens DICOM with full metadata."""
def __init__(self, dicom_path: str):
self.path = dicom_path
self.nib_img = nib.load(dicom_path)
self.dcm = pydicom.dcmread(dicom_path)
# Parse CSA headers
self.series_csa = None
self.image_csa = None
if (0x0029, 0x1010) in self.dcm:
self.series_csa = CsaHeader(self.dcm[0x0029, 0x1010].value).read()
if (0x0029, 0x1020) in self.dcm:
self.image_csa = CsaHeader(self.dcm[0x0029, 0x1020].value).read()
@property
def data(self) -> np.ndarray:
"""Get image data as numpy array."""
return self.nib_img.get_fdata()
@property
def affine(self) -> np.ndarray:
"""Get affine transformation matrix."""
return self.nib_img.affine
@property
def slice_times(self) -> list:
"""Get slice timing (ms)."""
if self.series_csa and 'MosaicRefAcqTimes' in self.series_csa:
return self.series_csa['MosaicRefAcqTimes']['value']
return []
@property
def tr_ms(self) -> float:
"""Get TR in milliseconds."""
if self.series_csa and 'MrPhoenixProtocol' in self.series_csa:
protocol = self.series_csa['MrPhoenixProtocol']['value']
if isinstance(protocol, dict) and 'alTR' in protocol:
return protocol['alTR'][0]
return None
@property
def b_value(self) -> float:
"""Get b-value (s/mm²)."""
if self.image_csa and 'B_value' in self.image_csa:
return self.image_csa['B_value']['value']
return None
# Usage
loader = SiemensDicomLoader('fmri_scan.dcm')
print(f"Shape: {loader.data.shape}")
print(f"TR: {loader.tr_ms} ms")
print(f"Slice times: {loader.slice_times[:5]}...")
print(f"Affine:\n{loader.affine}")Complete example: Load fMRI data, extract timing, and prepare for analysis:
import nibabel as nib
import numpy as np
from pathlib import Path
from csa_header import CsaHeader
import pydicom
def prepare_fmri_for_analysis(
dicom_path: str,
output_nifti: str = None,
output_timing: str = None
):
"""
Complete fMRI preprocessing workflow.
Extracts:
- 4D image data
- Slice timing
- TR/TE parameters
- Affine transformation
Parameters
----------
dicom_path : str
Path to representative DICOM from fMRI series
output_nifti : str, optional
Save NIfTI to this path
output_timing : str, optional
Save slice timing to this file
Returns
-------
dict
Complete fMRI metadata and data
"""
# Load DICOM
nib_img = nib.load(dicom_path)
dcm = pydicom.dcmread(dicom_path)
# Get image data
data = nib_img.get_fdata()
affine = nib_img.affine
# Extract CSA metadata
csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()
# Slice timing
slice_times_ms = csa_series.get('MosaicRefAcqTimes', {}).get('value', [])
n_slices = csa_series.get('NumberOfImagesInMosaic', {}).get('value', len(slice_times_ms))
# Protocol parameters
protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})
tr_ms = protocol.get('alTR', [None])[0] if protocol else None
te_ms = protocol.get('alTE', [None])[0] if protocol else None
flip_angle = protocol.get('adFlipAngleDegree', [None])[0] if protocol else None
# Normalize slice timing to seconds
if slice_times_ms and tr_ms:
slice_times_sec = np.array(slice_times_ms) / 1000.0
# Save timing file if requested
if output_timing:
np.savetxt(output_timing, slice_times_sec)
print(f"✓ Saved slice timing to {output_timing}")
# Save NIfTI if requested
if output_nifti:
nib.save(nib_img, output_nifti)
print(f"✓ Saved NIfTI to {output_nifti}")
# Prepare results
results = {
'data': data,
'affine': affine,
'shape': data.shape,
'slice_times_ms': slice_times_ms,
'slice_times_sec': slice_times_sec.tolist() if slice_times_ms else [],
'n_slices': n_slices,
'TR_ms': tr_ms,
'TR_sec': tr_ms / 1000.0 if tr_ms else None,
'TE_ms': te_ms,
'flip_angle': flip_angle,
'voxel_size': nib_img.header.get_zooms()[:3],
}
# Print summary
print("\n=== fMRI Summary ===")
print(f"Image shape: {results['shape']}")
print(f"Voxel size: {results['voxel_size']} mm")
print(f"Number of slices: {results['n_slices']}")
print(f"TR: {results['TR_ms']} ms ({results['TR_sec']:.2f} s)")
print(f"TE: {results['TE_ms']} ms")
print(f"Flip angle: {results['flip_angle']}°")
print(f"Slice timing: {len(results['slice_times_ms'])} time points")
return results
# Usage
fmri_data = prepare_fmri_for_analysis(
'fmri_001.dcm',
output_nifti='fmri_4d.nii.gz',
output_timing='slice_times.txt'
)
# Now ready for SPM/FSL/AFNI preprocessing!Example output:
✓ Saved slice timing to slice_times.txt
✓ Saved NIfTI to fmri_4d.nii.gz
=== fMRI Summary ===
Image shape: (64, 64, 36, 200)
Voxel size: (3.0, 3.0, 3.0) mm
Number of slices: 36
TR: 2000 ms (2.00 s)
TE: 30 ms
Flip angle: 90.0°
Slice timing: 36 time points
Complete diffusion imaging workflow with b-values and gradients:
import nibabel as nib
import numpy as np
from pathlib import Path
from csa_header import CsaHeader
import pydicom
def prepare_dwi_for_analysis(
dicom_directory: Path,
output_prefix: str = 'dwi'
):
"""
Complete DWI/DTI preprocessing workflow.
Extracts from a directory of DWI DICOMs:
- 4D DWI volume
- B-values (bvals)
- Gradient directions (bvecs)
- Creates FSL-compatible output
Parameters
----------
dicom_directory : Path
Directory containing DWI DICOM files
output_prefix : str
Prefix for output files (prefix.nii.gz, prefix.bval, prefix.bvec)
Returns
-------
dict
DWI metadata and file paths
"""
dicom_files = sorted(dicom_directory.glob('*.dcm'))
if not dicom_files:
raise ValueError(f"No DICOM files found in {dicom_directory}")
print(f"Processing {len(dicom_files)} DICOM files...")
# Storage for DWI parameters
volumes = []
bvals = []
bvecs = []
# Process each DICOM
for dcm_file in dicom_files:
# Load image data with NiBabel
nib_img = nib.load(str(dcm_file))
volume = nib_img.get_fdata()
volumes.append(volume)
# Extract DWI parameters with csa_header
dcm = pydicom.dcmread(str(dcm_file))
if (0x0029, 0x1020) in dcm:
image_csa = CsaHeader(dcm[0x0029, 0x1020].value).read()
# B-value
b_value = image_csa.get('B_value', {}).get('value', 0)
bvals.append(b_value)
# Gradient direction
gradient = image_csa.get('DiffusionGradientDirection', {}).get('value', [0, 0, 0])
bvecs.append(gradient)
else:
# Fallback for missing CSA
bvals.append(0)
bvecs.append([0, 0, 0])
# Stack volumes into 4D array
data_4d = np.stack(volumes, axis=-1)
# Get affine from first volume
affine = nib.load(str(dicom_files[0])).affine
# Create 4D NIfTI
nifti_4d = nib.Nifti1Image(data_4d, affine)
# Save files
nifti_path = f"{output_prefix}.nii.gz"
bval_path = f"{output_prefix}.bval"
bvec_path = f"{output_prefix}.bvec"
nib.save(nifti_4d, nifti_path)
print(f"✓ Saved 4D NIfTI: {nifti_path}")
# Save bvals (FSL format: single row)
np.savetxt(bval_path, np.array(bvals)[np.newaxis, :], fmt='%d')
print(f"✓ Saved b-values: {bval_path}")
# Save bvecs (FSL format: 3 rows, N columns)
bvecs_array = np.array(bvecs).T # Transpose to 3xN
np.savetxt(bvec_path, bvecs_array, fmt='%.6f')
print(f"✓ Saved gradient directions: {bvec_path}")
# Summary
unique_bvals = np.unique(bvals)
results = {
'nifti_path': nifti_path,
'bval_path': bval_path,
'bvec_path': bvec_path,
'shape': data_4d.shape,
'n_volumes': len(bvals),
'bvalues': unique_bvals.tolist(),
'affine': affine
}
print("\n=== DWI Summary ===")
print(f"4D shape: {results['shape']}")
print(f"Number of volumes: {results['n_volumes']}")
print(f"Unique b-values: {results['bvalues']} s/mm²")
print(f"\nReady for DTI analysis (FSL, MRtrix, DIPY, etc.)")
return results
# Usage
dwi_data = prepare_dwi_for_analysis(
Path('/path/to/dwi_dicoms'),
output_prefix='dwi_preprocessed'
)
# Files created:
# - dwi_preprocessed.nii.gz (4D volume)
# - dwi_preprocessed.bval (b-values)
# - dwi_preprocessed.bvec (gradient directions)
# Now run DTI fitting with FSL:
# dtifit -k dwi_preprocessed.nii.gz \
# -o dti_output \
# -m nodif_brain_mask.nii.gz \
# -r dwi_preprocessed.bvec \
# -b dwi_preprocessed.bvalCreate BIDS-compliant JSON sidecar with CSA metadata:
import json
from pathlib import Path
import nibabel as nib
import pydicom
from csa_header import CsaHeader
def create_bids_json(dicom_path: str, output_json: str):
"""
Create BIDS-compliant JSON sidecar with CSA metadata.
Parameters
----------
dicom_path : str
Path to representative DICOM file
output_json : str
Output path for JSON sidecar
Returns
-------
dict
BIDS metadata dictionary
"""
dcm = pydicom.dcmread(dicom_path)
# Extract CSA headers
csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()
protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})
# Build BIDS metadata
metadata = {
"Manufacturer": str(dcm.Manufacturer),
"ManufacturersModelName": str(dcm.ManufacturerModelName),
"MagneticFieldStrength": float(dcm.MagneticFieldStrength),
}
# Timing parameters
if protocol:
if 'alTR' in protocol:
metadata["RepetitionTime"] = protocol['alTR'][0] / 1000.0 # Convert to seconds
if 'alTE' in protocol:
metadata["EchoTime"] = protocol['alTE'][0] / 1000.0 # Convert to seconds
if 'adFlipAngleDegree' in protocol:
metadata["FlipAngle"] = protocol['adFlipAngleDegree'][0]
# Slice timing
if 'MosaicRefAcqTimes' in csa_series:
slice_times_ms = csa_series['MosaicRefAcqTimes']['value']
metadata["SliceTiming"] = [t / 1000.0 for t in slice_times_ms] # Convert to seconds
# Phase encoding
if 'PhaseEncodingDirectionPositive' in csa_series:
pe_positive = csa_series['PhaseEncodingDirectionPositive']['value']
# Simplified - actual direction requires more info
metadata["PhaseEncodingDirection"] = "j" if pe_positive else "j-"
# Save JSON
with open(output_json, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✓ Created BIDS JSON: {output_json}")
return metadata
# Usage
metadata = create_bids_json(
'sub-01_task-rest_bold_001.dcm',
'sub-01_task-rest_bold.json'
)Example JSON output:
{
"Manufacturer": "SIEMENS",
"ManufacturersModelName": "Prisma",
"MagneticFieldStrength": 3.0,
"RepetitionTime": 2.0,
"EchoTime": 0.03,
"FlipAngle": 90.0,
"SliceTiming": [0.0, 0.0525, 0.105, 0.1575, ...],
"PhaseEncodingDirection": "j"
}def compute_fmri_qa_metrics(dicom_path: str) -> dict:
"""Compute basic QA metrics for fMRI."""
# Load data with NiBabel
nib_img = nib.load(dicom_path)
data = nib_img.get_fdata()
# Extract parameters with csa_header
dcm = pydicom.dcmread(dicom_path)
csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()
protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})
# Compute metrics
metrics = {
'mean_signal': float(np.mean(data)),
'std_signal': float(np.std(data)),
'snr_estimate': float(np.mean(data) / np.std(data)),
'shape': data.shape,
'TR_ms': protocol.get('alTR', [None])[0] if protocol else None,
'TE_ms': protocol.get('alTE', [None])[0] if protocol else None
}
return metrics- Quick Start - Get started quickly
- User Guide - Practical CSA examples
- Technical Reference - Format specifications
- NiBabel Documentation - Complete NiBabel guide
- BIDS Specification - BIDS format details
- Example Integration Script - Complete code examples
Next Steps: Explore complete example scripts in the examples/ directory.
Version: 1.0+