diff --git a/1_network.yml b/1_network.yml index 8449e68..2b1946e 100644 --- a/1_network.yml +++ b/1_network.yml @@ -9,6 +9,7 @@ packages: - purrr - scipiper - geojsonio + - reticulate sources: - 1_network/src/geo_fabric_functions.R @@ -16,6 +17,7 @@ sources: - 1_network/src/calc_distance_functions.R - 1_network/src/network_dam_res_reaches.R - 1_network/src/reach_classification.R + - 1_network/src/catchment_attributes.R targets: @@ -26,6 +28,8 @@ targets: - 1_network/out/network.json.ind - 1_network/out/subseg_distance_matrix.rds.ind - 1_network/out/subseg_reservoir_mapping.rds.ind + - 1_network/out/seg_attr_metadata.csv.ind + - 1_network/out/seg_attr_drb.feather.ind # unzip the PRMS segments shapefile 1_network/in/delaware_stream_temp_by_segment/delaware_segments/delaware_segments.shp: @@ -86,3 +90,23 @@ targets: out_ind = target_name, network_ind = '1_network/out/network.rds.ind', reservoir_ind = '1_network/out/subseg_reservoir_mapping.rds.ind') + + # fetch and process catchment attributes + + # fetch and combine 6 categories of catchment attribute data + 1_network/out/combined_categories_catch_attr_region_02.feather.ind: + command: download_and_combine_sb_attr_data(out_ind = target_name) + depends: '1_network/src/catch_attr.py' + + # relate attribute data to segments and subset to DRB + 1_network/out/seg_attr_drb.feather.ind: + command: relate_attr_to_drb_segs(out_ind = target_name, + all_attr = '1_network/out/combined_categories_catch_attr_region_02.feather.ind', + geofabric = '1_network/in/GeospatialFabric_National.gdb.ind', + drb_network = '1_network/out/network.json.ind') + depends: '1_network/src/catch_attr.py' + + # gather and combine metadata + 1_network/out/seg_attr_metadata.csv.ind: + command: fetch_combine_catch_attr_metadata(out_ind = target_name) + depends: '1_network/src/catch_attr.py' diff --git a/20_catchment_attributes/src/catch_attr.py b/1_network/src/catch_attr.py similarity index 67% rename from 20_catchment_attributes/src/catch_attr.py rename to 1_network/src/catch_attr.py index 907bfac..8b6681c 100644 --- a/20_catchment_attributes/src/catch_attr.py +++ b/1_network/src/catch_attr.py @@ -27,18 +27,17 @@ def retrieve_cat_attr_links(just_categories=False): return links['urls'] -def download_unzip_cat_attr_data(directory, category, region): +def download_unzip_cat_attr_data(directory, item_url, category, region="02"): """ download files from the urls in the yaml file and then unzip the downloaded data :param directory: [str] path to the directory where the catchment attribute data should be downloaded and unzipped to - :param region: [str] zero-padded number for the region (e.g., '02') - :param category: [str] the category of parameter (SSURGO, NLCD, etc.) + :param download_url: [str] url for downloading data :return: [list] list of file locations of the downloaded and unzipped data """ zipped_path = directory+'/' + category + region - download_url = retrieve_file_url(category, region) + download_url = retrieve_file_url(item_url, category, region) urllib.request.urlretrieve(download_url, zipped_path) with zipfile.ZipFile(zipped_path, 'r') as zip_ref: zip_ref.extractall(directory) @@ -80,39 +79,39 @@ def read_sb_json(sb_url): return json.loads(response.text) -def retrieve_category_json(category): +def retrieve_item_json(url): """ form the url and get the json data for a certain catchment attribute category """ - url = retrieve_cat_attr_links()[category] # make sure we are getting the data back as json url += "?format=json" data = read_sb_json(url) return data -def retrieve_file_url(category, region): +def retrieve_file_url(item_url, category, region): """ retrieve the data download url from the science base entry + :param item_url: [str] url to SB item :param category: [str] the category of parameter (SSURGO, NLCD, etc.) :param region: [str] zero-padded number for the region (e.g., '02') """ - category_data = retrieve_category_json(category) + category_data = retrieve_item_json(item_url) for file_data in category_data['files']: should_end_with = f"{category}_{region}.zip".lower() if file_data['name'].lower().endswith(should_end_with): return file_data['downloadUri'] -def get_metadata_file(category, output_file): +def get_metadata_file(item_url, output_file): """ download the metadata json file for a given category :param category: [str] the category of parameter (SSURGO, NLCD, etc.) :param output_file: [str] the output file path where the metadata should be saved """ - category_data = retrieve_category_json(category) + category_data = retrieve_item_json(item_url) for file_data in category_data['files']: if file_data['originalMetadata']: urllib.request.urlretrieve(file_data['downloadUri'], output_file) @@ -153,11 +152,11 @@ def consolidate_metadata(metadata_files, output_file): return df -def add_nat_col_to_table(nat_reg_table, attr_df, region, join_idx_nat, +def add_nat_col_to_table(nat_reg_df, attr_df, region, join_idx_nat, join_idx_attr, nat_col_names, attr_col_names=None): """ add a column from a nat_reg_table to the attribute table. - :param nat_reg_table: [str] the path to the national-regional id look up + :param nat_reg_table: [dataframe] national-regional id look up df :param attr_df: [dataframe] dataframe with segment attributes :param region: [str] the region for relating them (e.g., '02') :param join_idx_nat: [str] the col name by which the nat_reg_table should @@ -170,7 +169,6 @@ def add_nat_col_to_table(nat_reg_table, attr_df, region, join_idx_nat, columns. if not included will use the nat_col_names :return: updated attr_df """ - nat_reg_df = pd.read_csv(nat_reg_table) # remove zero padding if there nat_reg_df['region'] = nat_reg_df['region'].astype(str).str.lstrip(to_strip='0') region = region.strip('0') @@ -187,37 +185,36 @@ def add_nat_col_to_table(nat_reg_table, attr_df, region, join_idx_nat, return attr_df -def add_ids_and_seg_attr(nat_reg_seg_file, nat_reg_hru_file, attr_file, region, - out_file): +def add_ids_and_seg_attr(geofab_file, attr_file, region): """ add the national segment and hru ids to the attribute table as well as a few segment attributes - :param nat_reg_seg_file: [str] path to file that contains the info about - the nationwide river segments - :param nat_reg_hru_file: [str] path to file that contains the info about - the nationwide hrus + :param geofab_file: [str] path to national geofabric geodatabase :param attr_file: [str] path to the attribute file that contains only regional hrus and their attributes :param region: [str] the region for relating them (e.g., '02') - :param out_file: [str] path to where the file with the new columns should - be saved :return: updated attr_file """ attr_df = pd.read_feather(attr_file) + + nat_seg_reg_df = gpd.read_file(geofab_file, driver="FileGDB", + layer="nsegmentNationalIdentifier") + nat_hru_reg_df = gpd.read_file(geofab_file, driver="FileGDB", + layer="nhruNationalIdentifier") + # add hru_nat_id and seg_reg_id - attr_df = add_nat_col_to_table(nat_reg_hru_file, attr_df, region, + attr_df = add_nat_col_to_table(nat_hru_reg_df, attr_df, region, 'hru_id_reg', 'hru_id_reg', ['hru_id_nat', 'hru_segment']) # add nat seg_id - attr_df = add_nat_col_to_table(nat_reg_seg_file, attr_df, region, + attr_df = add_nat_col_to_table(nat_seg_reg_df, attr_df, region, 'seg_id_reg', 'hru_segment', ['seg_id_nat']) # convert seg_nat_id Nan to 0 attr_df['seg_id_nat'].fillna(0, inplace=True) attr_df['seg_id_nat'] = attr_df['seg_id_nat'].astype(int) - attr_df.to_feather(out_file) return attr_df @@ -281,20 +278,18 @@ def aggregate_attr_by_col(attr_df, agg_col): return by_seg_mean -def relate_attr_to_segments(attr_w_id_file, out_file, rm_other_ids=True): +def relate_attr_to_segments(attr_df, rm_other_ids=True): """ relate the attributes to the stream segment and remove other id's to avoid confusion (unless rm_other_ids is False). The mean of all attributes is taken except for catchment area - :param attr_w_id_file: [str] path to feather file with attributes and the - nation-wide segment id (along with the regional and the HRU's) + :param attr_df: [str] dataframe with attributes and the nation-wide segment + id (along with the regional and the HRU's) data by - :param out_file: [str] path to where the output file should be written :param rm_other_ids: [bool] whether or not you want to drop the other id columns :return: [pandas df] df of attributes related to national segment id """ - attr_df = pd.read_feather(attr_w_id_file) agg_df = aggregate_attr_by_col(attr_df, 'seg_id_nat') if rm_other_ids: @@ -303,48 +298,116 @@ def relate_attr_to_segments(attr_w_id_file, out_file, rm_other_ids=True): agg_df = agg_df[good_cols] agg_df.reset_index(inplace=True) - agg_df.to_feather(out_file) return agg_df -def subset_by_links(subset_links, seg_attr_file, out_file): +def subset_by_links(subset_links, attr_df, out_file): """ subset the segment attributes just for the drb :param subset_links: [list] list of links that you want attribute data for - :param seg_attr_file: [str] path to file with the segment attributes by - national segment id + :param attr_df: [str] dataframe with the segment attributes by national + segment id :param out_file: [str] path to where the data should be written """ - seg_att_df = pd.read_feather(seg_attr_file) - seg_att_df.set_index('seg_id_nat', inplace=True) - seg_subset = seg_att_df.loc[subset_links] + attr_df.to_feather('attr_df.feather') + attr_df.set_index('seg_id_nat', inplace=True) + + # some segments don't have any attributes because they don't have an HRU + # associated with them. + # See https://github.com/USGS-R/delaware-model-prep/issues/40 + # So we first filter to the segments (links) that do have attrs and then + # do `reindex` to include the sites that don't (though they will all have + # NaN for all values) + links_with_attrs = subset_links[subset_links.isin(attr_df.index)] + seg_subset = attr_df.loc[links_with_attrs] + seg_subset = seg_subset.reindex(subset_links) + seg_subset.reset_index(inplace=True) seg_subset.to_feather(out_file) -def subset_for_drb(drb_file, seg_attr_file, out_file): +def subset_for_drb(drb_file, attr_df, out_file): """ subset the regional attribute file for just the reaches in the drb :param drb_file: [str] path to shapefile with the drb segment subset with 'seg_id_nat' as the links to subset by - :param seg_attr_file: [str] path to file with the segment attributes by - national segment id + :param attr_df: [str] dataframe with the segment attributes by national + segment id :param out_file: [str] path to where the data should be written """ subset_gdf = gpd.read_file(drb_file) link_ids = subset_gdf['seg_id_nat'] - subset_by_links(link_ids, seg_attr_file, out_file) + link_ids = link_ids[link_ids != 'NA'] + link_ids = link_ids.astype(int) + subset_by_links(link_ids, attr_df, out_file) -def subset_for_drb_subset(subset_links_file, seg_attr_file, out_file): +def download_and_combine_attr_data(out_file): """ - subset the attributes for the subset of the drb. this is a 42 link subset - of the drb - :param subset_links_file: [list] path to file with the drb subset link ids - :param seg_attr_file: [str] path to file with the segment attributes by - national segment id - :param out_file: [str] path to where the data should be written + download 6 types of catchment attribute data from science base from and + combine them + + The links are to geodatabases that contain PRMS parameters for hru's and + streamreachs in the GeoSpatial Fabric. The explanation of this is found + here: https://wwwbrr.cr.usgs.gov/projects/SW_MoWS/GeospatialFabric.html + (01-13-2020) + + I found the below links by going to the JSON version of the Science Base + pages (for example: + https://www.sciencebase.gov/catalog/item/537a6a01e4b0efa8af081544?format=json, + for soils) + + All of these are for the Region 02 but can be for any of the Regions 01-21 """ - seg_ids = pd.read_csv(subset_links_file, header=None)[0] - subset_by_links(seg_ids, seg_attr_file, out_file) + + + urls = { + "SSURGO": "https://www.sciencebase.gov/catalog/item/537a6a01e4b0efa8af081544", + "NLCD2001": "https://www.sciencebase.gov/catalog/item/537a6a10e4b0efa8af081547", + "NHDPLUSDEM": "https://www.sciencebase.gov/catalog/item/537a6a24e4b0efa8af08154a", + "Geog": "https://www.sciencebase.gov/catalog/item/537a59ece4b0efa8af081526", + "MacDNLCD01": "https://www.sciencebase.gov/catalog/item/537a6a51e4b0efa8af081550", + "Gleeson": "https://www.sciencebase.gov/catalog/item/537a6a33e4b0efa8af08154d"} + + out_directory = "1_network/out/" + + for category, url in urls.items(): + download_unzip_cat_attr_data(out_directory, url, category) + + downloaded_files = [os.path.join(out_directory, f"GeospatialFabricAttributes-PRMS_{category}_02.gdb") for category in urls.keys()] + + combine_categories(downloaded_files, out_file) + + +def relate_attr_to_drb_segs(attr_file, geofab_file, drb_net_file, out_file): + """ + relate attributes to drb segments + :param attr_file: [str] path to combined attribute file + :param geofab_file: [str] path to national geofabric geodatabase + :param drb_net_file: [str] path to file describing DRB network + :returns None: + """ + attr_df = add_ids_and_seg_attr(geofab_file, attr_file, "02") + attrs_to_segs = relate_attr_to_segments(attr_df, True) + subset_for_drb(drb_net_file, attrs_to_segs, out_file) + + +def fetch_combine_catch_attr_metadata(out_file): + urls = { + "SSURGO": "https://www.sciencebase.gov/catalog/item/537a6a01e4b0efa8af081544", + "NLCD2001": "https://www.sciencebase.gov/catalog/item/537a6a10e4b0efa8af081547", + "NHDPLUSDEM": "https://www.sciencebase.gov/catalog/item/537a6a24e4b0efa8af08154a", + "Geog": "https://www.sciencebase.gov/catalog/item/537a59ece4b0efa8af081526", + "MacDNLCD01": "https://www.sciencebase.gov/catalog/item/537a6a51e4b0efa8af081550", + "Gleeson": "https://www.sciencebase.gov/catalog/item/537a6a33e4b0efa8af08154d"} + + out_directory = "1_network/out/" + category_metadata_files = [] + + for category, url in urls.items(): + category_out_file = os.path.join(out_directory, f"{category}_metadata.xml") + get_metadata_file(url, category_out_file) + category_metadata_files.append(category_out_file) + + consolidate_metadata(category_metadata_files, out_file) diff --git a/1_network/src/catchment_attributes.R b/1_network/src/catchment_attributes.R new file mode 100644 index 0000000..22499d1 --- /dev/null +++ b/1_network/src/catchment_attributes.R @@ -0,0 +1,46 @@ +# The purpose of these functions is to interface between +# the scipiper pipeline and the Python code +download_and_combine_sb_attr_data <- function(out_ind) { + # reticulate will try to find a python executable to + # run the python functions. If the one it finds doesn't + # have the required libraries (see environment.yml) then + # it will cause an error (ModuleNotFound). In that case, + # you can either install the required packages in the Python + # environment that it is using via pip or conda, or you can + # create a conda environment from the projects environment.yml + # file and then tell reticulate to use that environment via + # reticulate::use_condaenv("drb_prep") + out_file <- as_data_file(out_ind) + catch_attr_py <- + reticulate::import_from_path('catch_attr', path = '1_network/src/') + catch_attr_py$download_and_combine_attr_data(out_file) + gd_put(out_ind) +} + + +relate_attr_to_drb_segs <- + function(out_ind, all_attr, geofabric, drb_network) { + out_file <- as_data_file(out_ind) + attr_file <- as_data_file(all_attr) + geofab_file <- as_data_file(geofabric) + drb_net_file <- as_data_file(drb_network) + + catch_attr_py <- + reticulate::import_from_path('catch_attr', path = '1_network/src/') + catch_attr_py$relate_attr_to_drb_segs( + out_file = out_file, + attr_file = attr_file, + geofab_file = geofab_file, + drb_net_file = drb_net_file + ) + gd_put(out_ind) + } + + +fetch_combine_catch_attr_metadata <- function(out_ind) { + out_file <- as_data_file(out_ind) + catch_attr_py <- + reticulate::import_from_path('catch_attr', path = '1_network/src/') + catch_attr_py$fetch_combine_catch_attr_metadata(out_file) + gd_put(out_ind) +} diff --git a/20_catchment_attributes/src/aggregate_upstream.py b/20_catchment_attributes/src/aggregate_upstream.py deleted file mode 100644 index eae8807..0000000 --- a/20_catchment_attributes/src/aggregate_upstream.py +++ /dev/null @@ -1,96 +0,0 @@ -import pandas as pd -from catch_attr import aggregate_attr_by_col - -def add_from_in_to_reach(us_link_df): - """ - adding the from_reach in the to_reach column to make sure we are - aggregating the US basins includeing the one of interest - :param us_link_df: [dataframe] dataframe with two columns, 'from_reach' and - 'to_reach' - :return: [dataframe] datafraom with the addition of the 'from_reach's in - the 'to_reach's - example: - before: - "from_reach", "to_reach" - d, c - d, a - d, b - e, d - e, c - e, b - e, a - after: - "from_reach", "to_reach" - d, c - d, a - d, b - d, d - e, d - e, c - e, b - e, a - e, e - """ - unique_from = us_link_df['from_reach'].unique() - new_df = pd.DataFrame([unique_from, unique_from]).T - new_df.columns = us_link_df.columns - us_link_df = us_link_df.append(new_df) - return us_link_df - - -def aggregate_upstream_attr(seg_attr_file, upstream_link_file, out_file): - """ - aggregate the attributes of all the upstream reaches for a given stream - reach and save as feather file - :param seg_attr_file: [str] path to file with the segment attributes by - national segment id - :param upstream_link_file: [str] path to csv file that has the list of - national reach ids and all of the national reach ids upstream of eachs - reach id with two columns, 'from_reach' and 'to_reach'. - For example: - "from_reach", "to_reach" - d, c - d, a - d, b - e, d - e, c - e, b - e, a - :param out_file: [str] path to feather file where the data will be written - :return: None - """ - # read in us link file - us_link_df = pd.read_csv(upstream_link_file) - - # process us_link_df - us_link_df = add_from_in_to_reach(us_link_df) - # get no nans b/c nan is where there - us_link_no_nan = us_link_df[~us_link_df['to_reach'].isna()] - # make sure 'to_reach' is integer - us_link_no_nan['to_reach'] = us_link_no_nan['to_reach'].astype(int) - # make list of US reaches for each DS reach - us_ds_list_ser = us_link_no_nan.groupby('from_reach')['to_reach'].apply(list) - - # read in attribute file - attr_df = pd.read_feather(seg_attr_file) - # set index so that the join will work - attr_df.set_index('seg_id_nat', inplace=True) - - # join the two dfs - df_list = [] - for ds_seg_id in us_ds_list_ser.index: - ser_seg = us_ds_list_ser.loc[[ds_seg_id]] - df_exp = ser_seg.explode().reset_index() - df_exp.set_index('to_reach', inplace=True) - attr_df_seg = attr_df.loc[df_exp.index] - attr_df_seg = attr_df_seg.join(df_exp) - # aggregate by the 'from_reach' col - agg_df = aggregate_attr_by_col(attr_df_seg, 'from_reach') - agg_df.reset_index(inplace=True) - df_list.append(agg_df) - - combined_df = pd.concat(df_list, ignore_index=True) - combined_df = combined_df.rename(columns={"from_reach": "seg_id_nat"}) - combined_df.to_feather(out_file) - return combined_df - diff --git a/environment.yml b/environment.yml index 53ac365..09cfd4f 100644 --- a/environment.yml +++ b/environment.yml @@ -3,7 +3,11 @@ channels: - conda-forge - defaults dependencies: + - bs4 - dask + - geopandas + - lxml + - pandas - pyarrow - xarray - zarr diff --git a/getters.yml b/getters.yml index 9d86512..9549e03 100644 --- a/getters.yml +++ b/getters.yml @@ -38,6 +38,10 @@ targets: command: gd_get('1_network/out/subseg_reservoir_mapping.rds.ind') 1_network/out/segments_relative_to_reservoirs.rds: command: gd_get('1_network/out/segments_relative_to_reservoirs.rds.ind') + 1_network/out/seg_attr_metadata.csv: + command: gd_get('1_network/out/seg_attr_metadata.csv.ind') + 1_network/out/1_network/out/seg_attr_drb.feather: + command: gd_get('1_network/out/seg_attr_drb.feather.ind') #### 2_observations ####