Skip to content

Commit dac0240

Browse files
anna-grimanna-grim
andauthored
refactor: cloud reads (#59)
Co-authored-by: anna-grim <anna.grim@alleninstitute.org>
1 parent 68923e9 commit dac0240

File tree

2 files changed

+40
-33
lines changed

2 files changed

+40
-33
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies = [
2020
"scikit-image",
2121
"tensorstore",
2222
"tifffile",
23+
"xlwt",
2324
"zarr",
2425
]
2526

src/segmentation_skeleton_metrics/swc_utils.py

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
"""
99

10-
from concurrent.futures import ThreadPoolExecutor, as_completed
10+
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
1111
from io import BytesIO
1212
from zipfile import ZipFile
1313

@@ -108,15 +108,24 @@ def parse_cloud_paths(cloud_dict, min_size, anisotropy):
108108
# Initializations
109109
bucket = storage.Client().bucket(cloud_dict["bucket_name"])
110110
zip_paths = utils.list_gcs_filenames(bucket, cloud_dict["path"], ".zip")
111+
print("Downloading predicted swc files from cloud...")
112+
print("# zip files:", len(zip_paths))
113+
with ProcessPoolExecutor() as executor:
114+
processes = []
115+
for path in zip_paths:
116+
zip_content = bucket.blob(path).download_as_bytes()
117+
processes.append(
118+
executor.submit(download, zip_content, anisotropy, min_size)
119+
)
120+
print("Processes Assigned!\n")
121+
122+
# Store results
111123
chunk_size = int(len(zip_paths) * 0.02)
112-
113-
# Parse
114124
cnt = 1
115125
valid_labels = dict()
116-
print("Downloading predicted swc files from cloud...")
117-
print("# zip files:", len(zip_paths))
118-
for i, path in enumerate(zip_paths):
119-
valid_labels.update(download(bucket, path, min_size, anisotropy))
126+
t0, t1 = utils.init_timers()
127+
for i, process in enumerate(as_completed(processes)):
128+
valid_labels.update(process.result())
120129
if i > cnt * chunk_size:
121130
utils.progress_bar(i + 1, len(zip_paths))
122131
cnt += 1
@@ -127,23 +136,21 @@ def parse_cloud_paths(cloud_dict, min_size, anisotropy):
127136
return valid_labels
128137

129138

130-
def download(bucket, zip_path, min_size, anisotropy):
139+
def download(zip_content, anisotropy, min_size):
131140
"""
132141
Downloads the contents from each swc file contained in the zip file at
133142
"zip_path".
134143
135144
Parameters
136145
----------
137-
bucket : str
138-
Name of GCS bucket containing swc files to be read.
139-
zip_path : str
140-
Path to zip file contained in GCS bucket.
141-
min_size : int
142-
Threshold on the number of nodes contained in an swc file. Only swc
143-
files with more than "min_size" nodes are stored in "valid_labels".
146+
zip_content : ...
147+
Contents of a zip file.
144148
anisotropy : list[float]
145149
Image to World scaling factors applied to xyz coordinates to account
146150
for anisotropy of the microscope.
151+
min_size : int
152+
Threshold on the number of nodes contained in an swc file. Only swc
153+
files with more than "min_size" nodes are stored in "valid_labels".
147154
148155
Returns
149156
-------
@@ -152,26 +159,25 @@ def download(bucket, zip_path, min_size, anisotropy):
152159
coordinates read from cooresponding swc file.
153160
154161
"""
155-
zip_content = bucket.blob(zip_path).download_as_bytes()
156162
with ZipFile(BytesIO(zip_content)) as zip_file:
157163
with ThreadPoolExecutor() as executor:
158164
# Assign threads
159-
threads = []
160-
for path in utils.list_files_in_gcs_zip(zip_content):
161-
threads.append(
162-
executor.submit(
163-
parse_gcs_zip, zip_file, path, min_size, anisotropy
164-
)
165+
paths = utils.list_files_in_gcs_zip(zip_content)
166+
threads = [
167+
executor.submit(
168+
parse_gcs_zip, zip_file, path, anisotropy, min_size
165169
)
170+
for path in paths
171+
]
166172

167-
# Process results
168-
valid_labels = dict()
169-
for thread in as_completed(threads):
170-
valid_labels.update(thread.result())
173+
# Process results
174+
valid_labels = dict()
175+
for thread in as_completed(threads):
176+
valid_labels.update(thread.result())
171177
return valid_labels
172178

173179

174-
def parse_gcs_zip(zip_file, path, min_size, anisotropy):
180+
def parse_gcs_zip(zip_file, path, anisotropy, min_size):
175181
"""
176182
Reads swc file stored at "path" which points to a file in a GCS bucket.
177183
@@ -181,17 +187,17 @@ def parse_gcs_zip(zip_file, path, min_size, anisotropy):
181187
Zip file containing swc file to be read.
182188
path : str
183189
Path to swc file to be read.
184-
min_size : int
185-
Threshold on the number of nodes contained in an swc file. Only swc
186-
files with more than "min_size" nodes are stored in "valid_labels".
187190
anisotropy : list[float]
188191
Image to World scaling factors applied to xyz coordinates to account
189192
for anisotropy of the microscope.
193+
min_size : int
194+
Threshold on the number of nodes contained in an swc file. Only swc
195+
files with more than "min_size" nodes are stored in "valid_labels".
190196
191197
Returns
192198
-------
193199
list
194-
List such that each entry is a line from the swc file.
200+
Entries of an swc file.
195201
196202
"""
197203
contents = read_from_cloud(zip_file, path)
@@ -226,8 +232,8 @@ def read_from_cloud(zip_file, path):
226232
Reads the content of an swc file from a zip file in a GCS bucket.
227233
228234
"""
229-
with zip_file.open(path) as text_file:
230-
return text_file.read().decode("utf-8").splitlines()
235+
with zip_file.open(path) as f:
236+
return f.read().decode("utf-8").splitlines()
231237

232238

233239
def get_coords(contents, anisotropy):

0 commit comments

Comments
 (0)