Skip to content

Integration with NiBabel

Zvi Baratz edited this page Oct 29, 2025 · 1 revision

Integration with NiBabel

Audience: Neuroimaging researchers and developers

Difficulty: Intermediate

Estimated reading time: 15 minutes

This guide demonstrates how to combine csa_header with NiBabel for comprehensive neuroimaging workflows.


Table of Contents


Why Combine csa_header and NiBabel?

csa_header and NiBabel serve complementary purposes in neuroimaging pipelines:

Library Purpose Strengths
csa_header Extract Siemens metadata CSA header parsing, ASCCONV protocols, timing parameters
NiBabel Handle imaging data Load/save neuroimaging formats, affine transforms, image arrays
flowchart TB
    A[Siemens DICOM Files] --> B[pydicom<br/>Read DICOM structure]
    A --> C[NiBabel<br/>Load image data]
    A --> D[csa_header<br/>Extract CSA metadata]

    B --> E{What do you need?}
    C --> E
    D --> E

    E -->|Pixel data| F[NiBabel]
    E -->|Affine/header| F
    E -->|Slice timing| G[csa_header]
    E -->|B-values| G
    E -->|Protocol params| G

    F --> H[Complete<br/>Neuroimaging<br/>Workflow]
    G --> H

    style A fill:#e1f5ff
    style F fill:#c8e6c9
    style G fill:#ffe1b2
    style H fill:#e1f5ff
Loading

Use Cases

1. Slice Timing Correction

  • NiBabel: Load 4D fMRI data
  • csa_header: Extract slice acquisition times
  • Combined: Apply correction with accurate timing

2. Diffusion Imaging

  • NiBabel: Load DWI volumes
  • csa_header: Extract b-values and gradients
  • Combined: Perform tensor fitting with correct parameters

3. BIDS Conversion

  • NiBabel: Convert DICOM → NIfTI
  • csa_header: Extract metadata for JSON sidecars
  • Combined: Create BIDS-compliant datasets

4. Quality Assurance

  • NiBabel: Compute image statistics
  • csa_header: Verify acquisition parameters
  • Combined: Comprehensive QA pipelines

Installation

# Install both libraries
pip install csa_header nibabel pydicom

# Or install with optional dependencies
pip install csa_header[examples]  # includes nibabel

Basic Integration Pattern

Standard Workflow

import nibabel as nib
import pydicom
from csa_header import CsaHeader
import numpy as np

def load_dicom_with_csa(dicom_path: str):
    """
    Load DICOM with both NiBabel and csa_header.

    Parameters
    ----------
    dicom_path : str
        Path to Siemens DICOM file

    Returns
    -------
    dict
        Dictionary with 'image' (nibabel), 'csa_series', 'csa_image'
    """
    # Load with NiBabel for image data
    nib_img = nib.load(dicom_path)

    # Load with pydicom for CSA extraction
    dcm = pydicom.dcmread(dicom_path)

    # Extract CSA headers
    csa_data = {}

    if (0x0029, 0x1010) in dcm:
        csa_data['series'] = CsaHeader(dcm[0x0029, 0x1010].value).read()

    if (0x0029, 0x1020) in dcm:
        csa_data['image'] = CsaHeader(dcm[0x0029, 0x1020].value).read()

    return {
        'nibabel_image': nib_img,
        'csa': csa_data,
        'dicom': dcm
    }


# Usage
data = load_dicom_with_csa('scan.dcm')

# Access image data via NiBabel
image_array = data['nibabel_image'].get_fdata()
affine = data['nibabel_image'].affine

# Access metadata via CSA
if 'series' in data['csa']:
    slice_times = data['csa']['series'].get('MosaicRefAcqTimes', {}).get('value', [])
    print(f"Image shape: {image_array.shape}")
    print(f"Slice timing: {len(slice_times)} time points")

Quick Access Helper

class SiemensDicomLoader:
    """Convenience wrapper for loading Siemens DICOM with full metadata."""

    def __init__(self, dicom_path: str):
        self.path = dicom_path
        self.nib_img = nib.load(dicom_path)
        self.dcm = pydicom.dcmread(dicom_path)

        # Parse CSA headers
        self.series_csa = None
        self.image_csa = None

        if (0x0029, 0x1010) in self.dcm:
            self.series_csa = CsaHeader(self.dcm[0x0029, 0x1010].value).read()

        if (0x0029, 0x1020) in self.dcm:
            self.image_csa = CsaHeader(self.dcm[0x0029, 0x1020].value).read()

    @property
    def data(self) -> np.ndarray:
        """Get image data as numpy array."""
        return self.nib_img.get_fdata()

    @property
    def affine(self) -> np.ndarray:
        """Get affine transformation matrix."""
        return self.nib_img.affine

    @property
    def slice_times(self) -> list:
        """Get slice timing (ms)."""
        if self.series_csa and 'MosaicRefAcqTimes' in self.series_csa:
            return self.series_csa['MosaicRefAcqTimes']['value']
        return []

    @property
    def tr_ms(self) -> float:
        """Get TR in milliseconds."""
        if self.series_csa and 'MrPhoenixProtocol' in self.series_csa:
            protocol = self.series_csa['MrPhoenixProtocol']['value']
            if isinstance(protocol, dict) and 'alTR' in protocol:
                return protocol['alTR'][0]
        return None

    @property
    def b_value(self) -> float:
        """Get b-value (s/mm²)."""
        if self.image_csa and 'B_value' in self.image_csa:
            return self.image_csa['B_value']['value']
        return None


# Usage
loader = SiemensDicomLoader('fmri_scan.dcm')

print(f"Shape: {loader.data.shape}")
print(f"TR: {loader.tr_ms} ms")
print(f"Slice times: {loader.slice_times[:5]}...")
print(f"Affine:\n{loader.affine}")

Complete Workflows

fMRI Preprocessing Workflow

Complete example: Load fMRI data, extract timing, and prepare for analysis:

import nibabel as nib
import numpy as np
from pathlib import Path
from csa_header import CsaHeader
import pydicom


def prepare_fmri_for_analysis(
    dicom_path: str,
    output_nifti: str = None,
    output_timing: str = None
):
    """
    Complete fMRI preprocessing workflow.

    Extracts:
    - 4D image data
    - Slice timing
    - TR/TE parameters
    - Affine transformation

    Parameters
    ----------
    dicom_path : str
        Path to representative DICOM from fMRI series
    output_nifti : str, optional
        Save NIfTI to this path
    output_timing : str, optional
        Save slice timing to this file

    Returns
    -------
    dict
        Complete fMRI metadata and data
    """
    # Load DICOM
    nib_img = nib.load(dicom_path)
    dcm = pydicom.dcmread(dicom_path)

    # Get image data
    data = nib_img.get_fdata()
    affine = nib_img.affine

    # Extract CSA metadata
    csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()

    # Slice timing
    slice_times_ms = csa_series.get('MosaicRefAcqTimes', {}).get('value', [])
    n_slices = csa_series.get('NumberOfImagesInMosaic', {}).get('value', len(slice_times_ms))

    # Protocol parameters
    protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})
    tr_ms = protocol.get('alTR', [None])[0] if protocol else None
    te_ms = protocol.get('alTE', [None])[0] if protocol else None
    flip_angle = protocol.get('adFlipAngleDegree', [None])[0] if protocol else None

    # Normalize slice timing to seconds
    if slice_times_ms and tr_ms:
        slice_times_sec = np.array(slice_times_ms) / 1000.0

        # Save timing file if requested
        if output_timing:
            np.savetxt(output_timing, slice_times_sec)
            print(f"✓ Saved slice timing to {output_timing}")

    # Save NIfTI if requested
    if output_nifti:
        nib.save(nib_img, output_nifti)
        print(f"✓ Saved NIfTI to {output_nifti}")

    # Prepare results
    results = {
        'data': data,
        'affine': affine,
        'shape': data.shape,
        'slice_times_ms': slice_times_ms,
        'slice_times_sec': slice_times_sec.tolist() if slice_times_ms else [],
        'n_slices': n_slices,
        'TR_ms': tr_ms,
        'TR_sec': tr_ms / 1000.0 if tr_ms else None,
        'TE_ms': te_ms,
        'flip_angle': flip_angle,
        'voxel_size': nib_img.header.get_zooms()[:3],
    }

    # Print summary
    print("\n=== fMRI Summary ===")
    print(f"Image shape: {results['shape']}")
    print(f"Voxel size: {results['voxel_size']} mm")
    print(f"Number of slices: {results['n_slices']}")
    print(f"TR: {results['TR_ms']} ms ({results['TR_sec']:.2f} s)")
    print(f"TE: {results['TE_ms']} ms")
    print(f"Flip angle: {results['flip_angle']}°")
    print(f"Slice timing: {len(results['slice_times_ms'])} time points")

    return results


# Usage
fmri_data = prepare_fmri_for_analysis(
    'fmri_001.dcm',
    output_nifti='fmri_4d.nii.gz',
    output_timing='slice_times.txt'
)

# Now ready for SPM/FSL/AFNI preprocessing!

Example output:

✓ Saved slice timing to slice_times.txt
✓ Saved NIfTI to fmri_4d.nii.gz

=== fMRI Summary ===
Image shape: (64, 64, 36, 200)
Voxel size: (3.0, 3.0, 3.0) mm
Number of slices: 36
TR: 2000 ms (2.00 s)
TE: 30 ms
Flip angle: 90.0°
Slice timing: 36 time points

DWI/DTI Workflow

Complete diffusion imaging workflow with b-values and gradients:

import nibabel as nib
import numpy as np
from pathlib import Path
from csa_header import CsaHeader
import pydicom


def prepare_dwi_for_analysis(
    dicom_directory: Path,
    output_prefix: str = 'dwi'
):
    """
    Complete DWI/DTI preprocessing workflow.

    Extracts from a directory of DWI DICOMs:
    - 4D DWI volume
    - B-values (bvals)
    - Gradient directions (bvecs)
    - Creates FSL-compatible output

    Parameters
    ----------
    dicom_directory : Path
        Directory containing DWI DICOM files
    output_prefix : str
        Prefix for output files (prefix.nii.gz, prefix.bval, prefix.bvec)

    Returns
    -------
    dict
        DWI metadata and file paths
    """
    dicom_files = sorted(dicom_directory.glob('*.dcm'))

    if not dicom_files:
        raise ValueError(f"No DICOM files found in {dicom_directory}")

    print(f"Processing {len(dicom_files)} DICOM files...")

    # Storage for DWI parameters
    volumes = []
    bvals = []
    bvecs = []

    # Process each DICOM
    for dcm_file in dicom_files:
        # Load image data with NiBabel
        nib_img = nib.load(str(dcm_file))
        volume = nib_img.get_fdata()
        volumes.append(volume)

        # Extract DWI parameters with csa_header
        dcm = pydicom.dcmread(str(dcm_file))

        if (0x0029, 0x1020) in dcm:
            image_csa = CsaHeader(dcm[0x0029, 0x1020].value).read()

            # B-value
            b_value = image_csa.get('B_value', {}).get('value', 0)
            bvals.append(b_value)

            # Gradient direction
            gradient = image_csa.get('DiffusionGradientDirection', {}).get('value', [0, 0, 0])
            bvecs.append(gradient)
        else:
            # Fallback for missing CSA
            bvals.append(0)
            bvecs.append([0, 0, 0])

    # Stack volumes into 4D array
    data_4d = np.stack(volumes, axis=-1)

    # Get affine from first volume
    affine = nib.load(str(dicom_files[0])).affine

    # Create 4D NIfTI
    nifti_4d = nib.Nifti1Image(data_4d, affine)

    # Save files
    nifti_path = f"{output_prefix}.nii.gz"
    bval_path = f"{output_prefix}.bval"
    bvec_path = f"{output_prefix}.bvec"

    nib.save(nifti_4d, nifti_path)
    print(f"✓ Saved 4D NIfTI: {nifti_path}")

    # Save bvals (FSL format: single row)
    np.savetxt(bval_path, np.array(bvals)[np.newaxis, :], fmt='%d')
    print(f"✓ Saved b-values: {bval_path}")

    # Save bvecs (FSL format: 3 rows, N columns)
    bvecs_array = np.array(bvecs).T  # Transpose to 3xN
    np.savetxt(bvec_path, bvecs_array, fmt='%.6f')
    print(f"✓ Saved gradient directions: {bvec_path}")

    # Summary
    unique_bvals = np.unique(bvals)

    results = {
        'nifti_path': nifti_path,
        'bval_path': bval_path,
        'bvec_path': bvec_path,
        'shape': data_4d.shape,
        'n_volumes': len(bvals),
        'bvalues': unique_bvals.tolist(),
        'affine': affine
    }

    print("\n=== DWI Summary ===")
    print(f"4D shape: {results['shape']}")
    print(f"Number of volumes: {results['n_volumes']}")
    print(f"Unique b-values: {results['bvalues']} s/mm²")
    print(f"\nReady for DTI analysis (FSL, MRtrix, DIPY, etc.)")

    return results


# Usage
dwi_data = prepare_dwi_for_analysis(
    Path('/path/to/dwi_dicoms'),
    output_prefix='dwi_preprocessed'
)

# Files created:
# - dwi_preprocessed.nii.gz (4D volume)
# - dwi_preprocessed.bval (b-values)
# - dwi_preprocessed.bvec (gradient directions)

# Now run DTI fitting with FSL:
# dtifit -k dwi_preprocessed.nii.gz \
#        -o dti_output \
#        -m nodif_brain_mask.nii.gz \
#        -r dwi_preprocessed.bvec \
#        -b dwi_preprocessed.bval

BIDS Conversion Workflow

Create BIDS-compliant JSON sidecar with CSA metadata:

import json
from pathlib import Path
import nibabel as nib
import pydicom
from csa_header import CsaHeader


def create_bids_json(dicom_path: str, output_json: str):
    """
    Create BIDS-compliant JSON sidecar with CSA metadata.

    Parameters
    ----------
    dicom_path : str
        Path to representative DICOM file
    output_json : str
        Output path for JSON sidecar

    Returns
    -------
    dict
        BIDS metadata dictionary
    """
    dcm = pydicom.dcmread(dicom_path)

    # Extract CSA headers
    csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()
    protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})

    # Build BIDS metadata
    metadata = {
        "Manufacturer": str(dcm.Manufacturer),
        "ManufacturersModelName": str(dcm.ManufacturerModelName),
        "MagneticFieldStrength": float(dcm.MagneticFieldStrength),
    }

    # Timing parameters
    if protocol:
        if 'alTR' in protocol:
            metadata["RepetitionTime"] = protocol['alTR'][0] / 1000.0  # Convert to seconds

        if 'alTE' in protocol:
            metadata["EchoTime"] = protocol['alTE'][0] / 1000.0  # Convert to seconds

        if 'adFlipAngleDegree' in protocol:
            metadata["FlipAngle"] = protocol['adFlipAngleDegree'][0]

    # Slice timing
    if 'MosaicRefAcqTimes' in csa_series:
        slice_times_ms = csa_series['MosaicRefAcqTimes']['value']
        metadata["SliceTiming"] = [t / 1000.0 for t in slice_times_ms]  # Convert to seconds

    # Phase encoding
    if 'PhaseEncodingDirectionPositive' in csa_series:
        pe_positive = csa_series['PhaseEncodingDirectionPositive']['value']
        # Simplified - actual direction requires more info
        metadata["PhaseEncodingDirection"] = "j" if pe_positive else "j-"

    # Save JSON
    with open(output_json, 'w') as f:
        json.dump(metadata, f, indent=2)

    print(f"✓ Created BIDS JSON: {output_json}")

    return metadata


# Usage
metadata = create_bids_json(
    'sub-01_task-rest_bold_001.dcm',
    'sub-01_task-rest_bold.json'
)

Example JSON output:

{
  "Manufacturer": "SIEMENS",
  "ManufacturersModelName": "Prisma",
  "MagneticFieldStrength": 3.0,
  "RepetitionTime": 2.0,
  "EchoTime": 0.03,
  "FlipAngle": 90.0,
  "SliceTiming": [0.0, 0.0525, 0.105, 0.1575, ...],
  "PhaseEncodingDirection": "j"
}

Advanced Examples

Example: Compute Mean Signal with QA

def compute_fmri_qa_metrics(dicom_path: str) -> dict:
    """Compute basic QA metrics for fMRI."""
    # Load data with NiBabel
    nib_img = nib.load(dicom_path)
    data = nib_img.get_fdata()

    # Extract parameters with csa_header
    dcm = pydicom.dcmread(dicom_path)
    csa_series = CsaHeader(dcm[0x0029, 0x1010].value).read()
    protocol = csa_series.get('MrPhoenixProtocol', {}).get('value', {})

    # Compute metrics
    metrics = {
        'mean_signal': float(np.mean(data)),
        'std_signal': float(np.std(data)),
        'snr_estimate': float(np.mean(data) / np.std(data)),
        'shape': data.shape,
        'TR_ms': protocol.get('alTR', [None])[0] if protocol else None,
        'TE_ms': protocol.get('alTE', [None])[0] if protocol else None
    }

    return metrics

See Also

Documentation

External Resources


Next Steps: Explore complete example scripts in the examples/ directory.

Clone this wiki locally