Skip to content

Commit 9c6169a

Browse files
committed
Fix NOAA API date range limits and CloudEvents source field
- Clamp date ranges to NOAA API maximums (30 days for 6-min data, 365 days for predictions) - Add source field to all CloudEvent message types in producer_client.py - Make station.portscode field optional to handle null values from API - Update noaa.xreg.json with source envelope metadata for all message types - Fix producer initialization to create Producer object from kafka_config This resolves 400 Bad Request errors caused by exceeding API limits and MissingRequiredFields errors for CloudEvents source attribute.
1 parent 46d5019 commit 9c6169a

File tree

4 files changed

+73
-13
lines changed

4 files changed

+73
-13
lines changed

noaa/noaa/noaa.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ def __init__(self, kafka_config: Dict[str, str], kafka_topic: str, last_polled_f
5959
"""
6060
self.kafka_topic = kafka_topic
6161
self.last_polled_file = last_polled_file
62-
self.producer = MicrosoftOpenDataUSNOAAEventProducer(kafka_config, kafka_topic)
62+
from confluent_kafka import Producer
63+
kafka_producer = Producer(kafka_config)
64+
self.producer = MicrosoftOpenDataUSNOAAEventProducer(kafka_producer, kafka_topic)
6365
self.stations = self.fetch_all_stations()
6466
if station:
6567
self.station = next((s for s in self.stations if s.id == station), None)
@@ -118,10 +120,25 @@ def poll_noaa_api(self, product: str, station_id: str, last_polled_time: datetim
118120
datum = self.get_datum_for_station(station_id)
119121
if datum == "IGLD" and "predictions" in product:
120122
return [] # Great Lakes stations don't have prediction data
123+
124+
# Clamp date range to NOAA API limits
125+
# 6-minute interval data (water_level, met data): max 1 month
126+
# Predictions: max 1 year
127+
now = datetime.now(timezone.utc)
128+
if product in ["predictions", "currents_predictions"]:
129+
# Predictions can go back 1 year
130+
max_begin_date = now - timedelta(days=365)
131+
else:
132+
# 6-minute interval data: max 1 month
133+
max_begin_date = now - timedelta(days=30)
134+
135+
# Use the more recent of last_polled_time or max_begin_date
136+
begin_date = max(last_polled_time, max_begin_date)
137+
121138
product_url = f"{self.BASE_URL}?{self.PRODUCTS[product]}{self.COMMON_PARAMS}&station={station_id}"
122139
if product != "currents_predictions" and product != "currents":
123140
product_url += f"&datum={datum}"
124-
product_url += f"&begin_date={last_polled_time.strftime('%Y%m%d %H:%M')}&end_date={datetime.now(timezone.utc).strftime('%Y%m%d %H:%M')}"
141+
product_url += f"&begin_date={begin_date.strftime('%Y%m%d %H:%M')}&end_date={now.strftime('%Y%m%d %H:%M')}"
125142
data_key = "data" if "predictions" not in product else "predictions"
126143
try:
127144
response = requests.get(product_url, timeout=10)

noaa/noaa/noaa_producer/microsoft/opendata/us/noaa/station.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class Station:
9595
lat: float=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="lat"))
9696
lng: float=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="lng"))
9797
affiliations: str=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="affiliations"))
98-
portscode: str=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="portscode"))
98+
portscode: typing.Optional[str]=dataclasses.field(kw_only=True, default=None, metadata=dataclasses_json.config(field_name="portscode"))
9999
products: Products=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="products"))
100100
disclaimers: Disclaimers=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="disclaimers"))
101101
notices: Notices=dataclasses.field(kw_only=True, metadata=dataclasses_json.config(field_name="notices"))

noaa/noaa/noaa_producer/producer_client.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def send_microsoft_open_data_us_noaa_water_level(self,data: WaterLevel, content_
5959
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
6060
"""
6161
attributes = {
62-
"type":"Microsoft.OpenData.US.NOAA.WaterLevel"
62+
"type":"Microsoft.OpenData.US.NOAA.WaterLevel",
63+
"source":"https://api.tidesandcurrents.noaa.gov"
6364
}
6465
attributes["datacontenttype"] = content_type
6566
event = CloudEvent.create(attributes, data)
@@ -87,7 +88,8 @@ def send_microsoft_open_data_us_noaa_predictions(self,data: Predictions, content
8788
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
8889
"""
8990
attributes = {
90-
"type":"Microsoft.OpenData.US.NOAA.Predictions"
91+
"type":"Microsoft.OpenData.US.NOAA.Predictions",
92+
"source":"https://api.tidesandcurrents.noaa.gov"
9193
}
9294
attributes["datacontenttype"] = content_type
9395
event = CloudEvent.create(attributes, data)
@@ -115,7 +117,8 @@ def send_microsoft_open_data_us_noaa_air_pressure(self,data: AirPressure, conten
115117
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
116118
"""
117119
attributes = {
118-
"type":"Microsoft.OpenData.US.NOAA.AirPressure"
120+
"type":"Microsoft.OpenData.US.NOAA.AirPressure",
121+
"source":"https://api.tidesandcurrents.noaa.gov"
119122
}
120123
attributes["datacontenttype"] = content_type
121124
event = CloudEvent.create(attributes, data)
@@ -143,7 +146,8 @@ def send_microsoft_open_data_us_noaa_air_temperature(self,data: AirTemperature,
143146
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
144147
"""
145148
attributes = {
146-
"type":"Microsoft.OpenData.US.NOAA.AirTemperature"
149+
"type":"Microsoft.OpenData.US.NOAA.AirTemperature",
150+
"source":"https://api.tidesandcurrents.noaa.gov"
147151
}
148152
attributes["datacontenttype"] = content_type
149153
event = CloudEvent.create(attributes, data)
@@ -171,7 +175,8 @@ def send_microsoft_open_data_us_noaa_water_temperature(self,data: WaterTemperatu
171175
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
172176
"""
173177
attributes = {
174-
"type":"Microsoft.OpenData.US.NOAA.WaterTemperature"
178+
"type":"Microsoft.OpenData.US.NOAA.WaterTemperature",
179+
"source":"https://api.tidesandcurrents.noaa.gov"
175180
}
176181
attributes["datacontenttype"] = content_type
177182
event = CloudEvent.create(attributes, data)
@@ -199,7 +204,8 @@ def send_microsoft_open_data_us_noaa_wind(self,data: Wind, content_type: str = "
199204
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
200205
"""
201206
attributes = {
202-
"type":"Microsoft.OpenData.US.NOAA.Wind"
207+
"type":"Microsoft.OpenData.US.NOAA.Wind",
208+
"source":"https://api.tidesandcurrents.noaa.gov"
203209
}
204210
attributes["datacontenttype"] = content_type
205211
event = CloudEvent.create(attributes, data)
@@ -227,7 +233,8 @@ def send_microsoft_open_data_us_noaa_humidity(self,data: Humidity, content_type:
227233
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
228234
"""
229235
attributes = {
230-
"type":"Microsoft.OpenData.US.NOAA.Humidity"
236+
"type":"Microsoft.OpenData.US.NOAA.Humidity",
237+
"source":"https://api.tidesandcurrents.noaa.gov"
231238
}
232239
attributes["datacontenttype"] = content_type
233240
event = CloudEvent.create(attributes, data)
@@ -255,7 +262,8 @@ def send_microsoft_open_data_us_noaa_conductivity(self,data: Conductivity, conte
255262
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
256263
"""
257264
attributes = {
258-
"type":"Microsoft.OpenData.US.NOAA.Conductivity"
265+
"type":"Microsoft.OpenData.US.NOAA.Conductivity",
266+
"source":"https://api.tidesandcurrents.noaa.gov"
259267
}
260268
attributes["datacontenttype"] = content_type
261269
event = CloudEvent.create(attributes, data)
@@ -283,7 +291,8 @@ def send_microsoft_open_data_us_noaa_salinity(self,data: Salinity, content_type:
283291
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
284292
"""
285293
attributes = {
286-
"type":"Microsoft.OpenData.US.NOAA.Salinity"
294+
"type":"Microsoft.OpenData.US.NOAA.Salinity",
295+
"source":"https://api.tidesandcurrents.noaa.gov"
287296
}
288297
attributes["datacontenttype"] = content_type
289298
event = CloudEvent.create(attributes, data)
@@ -311,7 +320,8 @@ def send_microsoft_open_data_us_noaa_station(self,data: Station, content_type: s
311320
The default key mapper maps the CloudEvent type, source, and subject to the Kafka key
312321
"""
313322
attributes = {
314-
"type":"Microsoft.OpenData.US.NOAA.Station"
323+
"type":"Microsoft.OpenData.US.NOAA.Station",
324+
"source":"https://api.tidesandcurrents.noaa.gov"
315325
}
316326
attributes["datacontenttype"] = content_type
317327
event = CloudEvent.create(attributes, data)

noaa/xreg/noaa.xreg.json

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
"envelopemetadata": {
99
"type": {
1010
"value": "Microsoft.OpenData.US.NOAA.WaterLevel"
11+
},
12+
"source": {
13+
"value": "https://api.tidesandcurrents.noaa.gov"
1114
}
1215
},
1316
"dataschemaformat": "Avro/1.11.3",
@@ -19,6 +22,9 @@
1922
"envelopemetadata": {
2023
"type": {
2124
"value": "Microsoft.OpenData.US.NOAA.Predictions"
25+
},
26+
"source": {
27+
"value": "https://api.tidesandcurrents.noaa.gov"
2228
}
2329
},
2430
"dataschemaformat": "Avro/1.11.3",
@@ -30,6 +36,9 @@
3036
"envelopemetadata": {
3137
"type": {
3238
"value": "Microsoft.OpenData.US.NOAA.AirPressure"
39+
},
40+
"source": {
41+
"value": "https://api.tidesandcurrents.noaa.gov"
3342
}
3443
},
3544
"dataschemaformat": "Avro/1.11.3",
@@ -41,6 +50,9 @@
4150
"envelopemetadata": {
4251
"type": {
4352
"value": "Microsoft.OpenData.US.NOAA.AirTemperature"
53+
},
54+
"source": {
55+
"value": "https://api.tidesandcurrents.noaa.gov"
4456
}
4557
},
4658
"dataschemaformat": "Avro/1.11.3",
@@ -52,6 +64,9 @@
5264
"envelopemetadata": {
5365
"type": {
5466
"value": "Microsoft.OpenData.US.NOAA.WaterTemperature"
67+
},
68+
"source": {
69+
"value": "https://api.tidesandcurrents.noaa.gov"
5570
}
5671
},
5772
"dataschemaformat": "Avro/1.11.3",
@@ -63,6 +78,9 @@
6378
"envelopemetadata": {
6479
"type": {
6580
"value": "Microsoft.OpenData.US.NOAA.Wind"
81+
},
82+
"source": {
83+
"value": "https://api.tidesandcurrents.noaa.gov"
6684
}
6785
},
6886
"dataschemaformat": "Avro/1.11.3",
@@ -74,6 +92,9 @@
7492
"envelopemetadata": {
7593
"type": {
7694
"value": "Microsoft.OpenData.US.NOAA.Humidity"
95+
},
96+
"source": {
97+
"value": "https://api.tidesandcurrents.noaa.gov"
7798
}
7899
},
79100
"dataschemaformat": "Avro/1.11.3",
@@ -85,6 +106,9 @@
85106
"envelopemetadata": {
86107
"type": {
87108
"value": "Microsoft.OpenData.US.NOAA.Conductivity"
109+
},
110+
"source": {
111+
"value": "https://api.tidesandcurrents.noaa.gov"
88112
}
89113
},
90114
"dataschemaformat": "Avro/1.11.3",
@@ -96,6 +120,9 @@
96120
"envelopemetadata": {
97121
"type": {
98122
"value": "Microsoft.OpenData.US.NOAA.Salinity"
123+
},
124+
"source": {
125+
"value": "https://api.tidesandcurrents.noaa.gov"
99126
}
100127
},
101128
"dataschemaformat": "Avro/1.11.3",
@@ -107,6 +134,9 @@
107134
"envelopemetadata": {
108135
"type": {
109136
"value": "Microsoft.OpenData.US.NOAA.Station"
137+
},
138+
"source": {
139+
"value": "https://api.tidesandcurrents.noaa.gov"
110140
}
111141
},
112142
"dataschemaformat": "Avro/1.11.3",
@@ -119,6 +149,9 @@
119149
"type": {
120150
"value": "Microsoft.OpenData.US.NOAA.Visibility"
121151
},
152+
"source": {
153+
"value": "https://api.tidesandcurrents.noaa.gov"
154+
},
122155
"datacontenttype": {
123156
"type": "string",
124157
"required": true

0 commit comments

Comments
 (0)