Skip to content

Commit 901e962

Browse files
authored
Smokers Health FA Taskrunner Workspace (#1621)
* code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * remove workspace Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> * code changes Signed-off-by: yes <shailesh.tanwar@intel.com> --------- Signed-off-by: yes <shailesh.tanwar@intel.com>
1 parent e716c88 commit 901e962

File tree

14 files changed

+481
-4
lines changed

14 files changed

+481
-4
lines changed

openfl-workspace/federated_analytics/smokers_health/.workspace

Whitespace-only changes.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Federated Analytics: Smokers Health Example
2+
3+
This workspace demonstrates how to use OpenFL for privacy-preserving analytics on the Smokers Health dataset. The setup enables distributed computation of health statistics (such as heart rate, cholesterol, and blood pressure) across multiple collaborators, without sharing raw data.
4+
5+
## Instantiating a Workspace from Smokers Health Template
6+
To instantiate a workspace from the `federated_analytics/smokers_health` template, use the `fx workspace create` command. This will set up a new workspace with the required configuration and code.
7+
8+
1. **Install dependencies:**
9+
```bash
10+
pip install virtualenv
11+
mkdir ~/openfl-smokers-health
12+
virtualenv ~/openfl-smokers-health/venv
13+
source ~/openfl-smokers-health/venv/bin/activate
14+
pip install openfl
15+
```
16+
17+
2. **Create the Workspace Folder:**
18+
```bash
19+
cd ~/openfl-smokers-health
20+
fx workspace create --template federated_analytics/smokers_health --prefix fl_workspace
21+
cd ~/openfl-smokers-health/fl_workspace
22+
```
23+
24+
## Directory Structure
25+
The workspace has the following structure:
26+
```
27+
smokers_health
28+
├── requirements.txt
29+
├── .workspace
30+
├── plan
31+
│ ├── plan.yaml
32+
│ ├── cols.yaml
33+
│ ├── data.yaml
34+
│ └── defaults/
35+
├── src
36+
│ ├── __init__.py
37+
│ ├── dataloader.py
38+
│ ├── taskrunner.py
39+
│ └── aggregate_health.py
40+
├── data/
41+
└── save/
42+
```
43+
44+
### Directory Breakdown
45+
- **requirements.txt**: Lists all Python dependencies for the workspace.
46+
- **plan/**: Contains configuration files for the federation:
47+
- `plan.yaml`: Main plan declaration.
48+
- `cols.yaml`: List of authorized collaborators.
49+
- `data.yaml`: Data path for each collaborator.
50+
- `defaults/`: Default configuration values.
51+
- **src/**: Python modules for federated analytics:
52+
- `dataloader.py`: Loads and shards the Smokers Health dataset, supports SQL queries.
53+
- `taskrunner.py`: Groups data and computes mean health metrics by age, sex, and smoking status.
54+
- `aggregatehealth.py`: Aggregates results from all collaborators.
55+
- **data/**: Place to store the downloaded and unzipped dataset.
56+
- **save/**: Stores aggregated results and analytics outputs.
57+
58+
## Data Preparation
59+
The data loader will automatically download the Smokers Health dataset from Kaggle or a specified source. Make sure you have the required access or download the dataset manually if needed.
60+
61+
## Defining the Data Loader
62+
The data loader supports SQL-like queries and can load data from CSV or other sources as configured. It shards the dataset among collaborators and provides query functionality for analytics tasks.
63+
64+
## Defining the Task Runner
65+
The task runner groups the data by `age`, `sex`, and `current_smoker`, and computes the mean of `heart_rate`, `chol`, and `blood pressure (systolic/diastolic)`. The results are returned as numpy arrays for aggregation.
66+
67+
## Running the Federation
68+
1. **Initialize the plan:**
69+
```bash
70+
fx plan initialize
71+
```
72+
2. **Set up the aggregator and collaborators:**
73+
```bash
74+
fx workspace certify
75+
fx aggregator generate-cert-request
76+
fx aggregator certify --silent
77+
78+
fx collaborator create -n collaborator1 -d 1
79+
fx collaborator generate-cert-request -n collaborator1
80+
fx collaborator certify -n collaborator1 --silent
81+
82+
fx collaborator create -n collaborator2 -d 2
83+
fx collaborator generate-cert-request -n collaborator2
84+
fx collaborator certify -n collaborator2 --silent
85+
```
86+
3. **Start the federation:**
87+
```bash
88+
fx aggregator start &
89+
fx collaborator start -n collaborator1 &
90+
fx collaborator start -n collaborator2 &
91+
```
92+
93+
## License
94+
This project is licensed under the Apache License 2.0. See the LICENSE file for details.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
3+
4+
collaborators:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
3+
4+
# collaborator_name,data_directory_path
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
aggregator:
2+
defaults: plan/defaults/aggregator.yaml
3+
template: openfl.component.Aggregator
4+
settings:
5+
last_state_path: save/result.json
6+
rounds_to_train: 1
7+
8+
collaborator:
9+
defaults: plan/defaults/collaborator.yaml
10+
template: openfl.component.Collaborator
11+
settings:
12+
use_delta_updates: false
13+
opt_treatment: RESET
14+
15+
data_loader:
16+
defaults: plan/defaults/data_loader.yaml
17+
template: src.dataloader.SmokersHealthDataLoader
18+
settings:
19+
collaborator_count: 2
20+
data_group_name: smokers_health
21+
batch_size: 150
22+
23+
task_runner:
24+
defaults: plan/defaults/task_runner.yaml
25+
template: src.taskrunner.SmokersHealthAnalytics
26+
27+
network:
28+
defaults: plan/defaults/network.yaml
29+
30+
assigner:
31+
template: openfl.component.RandomGroupedAssigner
32+
settings:
33+
task_groups:
34+
- name: analytics
35+
percentage: 1.0
36+
tasks:
37+
- analytics
38+
39+
tasks:
40+
analytics:
41+
function: analytics
42+
aggregation_type:
43+
template: src.aggregate_health.AggregateHealthMetrics
44+
kwargs:
45+
columns: ['age', 'sex', 'current_smoker', 'heart_rate', 'blood_pressure', 'cigs_per_day', 'chol']

openfl-workspace/federated_analytics/smokers_health/requirements.txt

Whitespace-only changes.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import numpy as np
5+
from openfl.interface.aggregation_functions.core import AggregationFunction
6+
7+
8+
class AggregateHealthMetrics(AggregationFunction):
9+
"""Aggregation logic for Smokers Health analytics."""
10+
11+
def call(self, local_tensors, *_) -> dict:
12+
"""
13+
Aggregates local tensors which contains mean of local health metrics such as
14+
heart_rate_mean, cholesterol, systolic_blood_pressure, and
15+
diastolic_blood_pressure which are grouped by age, sex and if they smoke or not.
16+
Each tensor represents local metrics for these health parameters.
17+
18+
Args:
19+
local_tensors (list): A list of objects, each containing a `tensor` attribute
20+
that represents local means for the health metrics.
21+
*_: Additional arguments (unused).
22+
Returns:
23+
dict: A dictionary containing the aggregated means for each health metric.
24+
Raises:
25+
ValueError: If the input list `local_tensors` is empty, indicating
26+
that there are no metrics to aggregate.
27+
"""
28+
29+
if not local_tensors:
30+
raise ValueError("No local metrics to aggregate.")
31+
32+
agg_histogram = np.zeros_like(local_tensors[0].tensor)
33+
for local_tensor in local_tensors:
34+
agg_histogram += local_tensor.tensor / len(local_tensors)
35+
return agg_histogram
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from openfl.federated.data.loader import DataLoader
5+
import pandas as pd
6+
import os
7+
import subprocess
8+
9+
10+
class SmokersHealthDataLoader(DataLoader):
11+
"""Data Loader for Smokers Health Dataset."""
12+
13+
def __init__(self, batch_size, data_path, **kwargs):
14+
super().__init__(**kwargs)
15+
16+
# If data_path is None, this is being used for model initialization only
17+
if data_path is None:
18+
return
19+
20+
# Load actual data if a data path is provided
21+
try:
22+
int(data_path)
23+
except ValueError:
24+
raise ValueError(
25+
f"Expected '{data_path}' to be representable as `int`, "
26+
"as it refers to the data shard number used by the collaborator."
27+
)
28+
29+
# Download and prepare data
30+
self._download_raw_data()
31+
self.data_shard = self.load_data_shard(
32+
shard_num=int(data_path), **kwargs
33+
)
34+
35+
def _download_raw_data(self):
36+
"""
37+
Downloads and extracts the raw data for the smokers' health dataset.
38+
This method performs the following steps:
39+
1. Downloads the dataset from the specified Kaggle URL using the `curl` command.
40+
2. Saves the downloaded file as a ZIP archive in the `./data` directory.
41+
3. Extracts the contents of the ZIP archive into the `data` directory.
42+
"""
43+
44+
download_path = os.path.expanduser('./data/smokers_health.zip')
45+
subprocess.run(
46+
[
47+
'curl', '-L', '-o', download_path,
48+
'https://www.kaggle.com/api/v1/datasets/download/jaceprater/smokers-health-data'
49+
],
50+
check=True
51+
)
52+
53+
# Unzip the downloaded file into the data directory
54+
subprocess.run(['unzip', '-o', download_path, '-d', 'data'], check=True)
55+
56+
def load_data_shard(self, shard_num, **kwargs):
57+
"""
58+
Loads data from a CSV file.
59+
This method reads the data from a CSV file located at './data/smoking_health_data_final.csv'
60+
and returns it as a pandas DataFrame.
61+
Returns:
62+
pd.DataFrame: The data loaded from the CSV file.
63+
"""
64+
file_path = os.path.join('data', 'smoking_health_data_final.csv')
65+
df = pd.read_csv(file_path)
66+
67+
# Split data into shards
68+
shard_size = len(df) // shard_num
69+
start_idx = shard_size * (shard_num - 1)
70+
end_idx = start_idx + shard_size
71+
72+
return df.iloc[start_idx:end_idx]
73+
74+
def query(self, columns, **kwargs):
75+
"""
76+
Query the data shard for the specified columns.
77+
Args:
78+
columns (list): A list of column names to query from the data shard.
79+
**kwargs: Additional keyword arguments (currently not used).
80+
Returns:
81+
DataFrame: A DataFrame containing the data for the specified columns.
82+
Raises:
83+
ValueError: If the columns parameter is not a list.
84+
"""
85+
if not isinstance(columns, list):
86+
raise ValueError("Columns parameter must be a list")
87+
return self.data_shard[columns]
88+
89+
def get_feature_shape(self):
90+
"""
91+
This function is not required and is kept for compatibility.
92+
93+
Returns:
94+
None
95+
"""
96+
pass

0 commit comments

Comments
 (0)