summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml17
-rw-r--r--src/main.rs210
-rw-r--r--src/weather_gov_json.rs169
3 files changed, 396 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..f9b6b43
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "weather-data-aggregator"
+description = ""
+version = "0.1.0"
+edition = "2021"
+authors = ["Matt Kohls"]
+license = "GPL-3.0-or-later"
+publish = false
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+reqwest = { version = "0.11", features = ["json", "blocking"] }
+serde = { version = "1.0", features = ["derive"]}
+serde_json = "1.0"
+paho-mqtt = "0.11"
+config = "0.13.1"
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..a5478ba
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,210 @@
+
+/*!
+ * Weather.gov Data Aggregator
+ *
+ *
+ * Matt Kohls
+ * GPL v3
+ * 2023
+ */
+
+use std::{
+ process,
+ time::Duration,
+ collections::HashMap
+};
+
+use reqwest::header::USER_AGENT;
+use config::Config;
+
+extern crate paho_mqtt as mqtt;
+
+mod weather_gov_json;
+use crate::weather_gov_json::weather_gov_json::{
+ Forecast,
+ Alerts,
+};
+
+const DEFAULT_BROKER:&str = "tcp://localhost:1883";
+const DEFAULT_CLIENT_ID:&str = "weather-data-aggregator";
+const DEFAULT_ROOT_FORECAST_TOPIC:&str = "forecast";
+const QOS:i32 = 0;
+
+const DEFAULT_USER_AGENT_STRING:&str = "Mozilla/5.0 (X11; Linux x86_64; rv:104.0) Gecko/20100101 Firefox/104.0";
+const GRIDPOINTS_PATH:&str = "https://api.weather.gov/gridpoints/";
+const ACTIVE_ALERTS_PATH:&str = "https://api.weather.gov/alerts/active?status=actual&zone=";
+
+fn main() {
+
+ let settings = Config::builder()
+ .add_source(config::File::with_name("config"))
+ .build()
+ .unwrap()
+ .try_deserialize::<HashMap<String, String>>()
+ .unwrap_or_else(|err| {
+ println!("Error reading config file: {:?}", err);
+ process::exit(1);
+ });
+
+ let mqtt_client = create_mqtt_client(&settings);
+
+ let mut root_forecast_topic = &DEFAULT_ROOT_FORECAST_TOPIC.to_string();
+ if settings.contains_key("root_forecast_topic") {
+ root_forecast_topic = &settings["root_forecast_topic"];
+ }
+
+ let mut user_agent = &DEFAULT_USER_AGENT_STRING.to_string();
+ if settings.contains_key("user_agent") {
+ user_agent = &settings["user_agent"];
+ }
+
+ let mut active_alert_url = ACTIVE_ALERTS_PATH.to_string();
+ if settings.contains_key("zone_id") {
+ active_alert_url.push_str(&settings["zone_id"]);
+
+ let alert_response = fetch_alerts(&active_alert_url, &user_agent);
+ match alert_response {
+ Err(err) => println!("Error fetching alerts: {} - {:?}", active_alert_url, err),
+ Ok(alerts) => print_alerts(&alerts),
+ }
+ } else {
+ println!("No zone_id, skipping active alert lookup & publish")
+ }
+
+ let mut forecast_url = GRIDPOINTS_PATH.to_string();
+ if settings.contains_key("forecast_office") && settings.contains_key("grid_x") && settings.contains_key("grid_y") {
+ forecast_url.push_str(&format!("{}/{},{}/forecast", &settings["forecast_office"], &settings["grid_x"], &settings["grid_y"]));
+
+ let daily_response = fetch_forecast(&forecast_url, &user_agent);
+ match daily_response {
+ Err(err) => println!("Error fetching forecast: {} - {:?}", forecast_url, err),
+ Ok(forecast) => publish_forecast(&forecast, false, root_forecast_topic, &mqtt_client),
+ }
+
+ forecast_url.push_str("/hourly");
+ let hourly_response = fetch_forecast(&forecast_url, &user_agent);
+ match hourly_response {
+ Err(err) => println!("Error fetching forecast: {} - {:?}", forecast_url, err),
+ Ok(forecast) => print_forecast(&forecast),
+ }
+ } else {
+ println!("No (or missing one of) the following: forecast_office, grid_x, grid_y, skipping forecast lookup & publish");
+ }
+
+ let _ = mqtt_client.disconnect_after(Duration::from_secs(20));
+}
+
+/**
+ * Creates a new MQTT client that is connected to the broker
+ *
+ **/
+fn create_mqtt_client(settings: &HashMap<String, String>) -> mqtt::Client {
+ let mut broker = &DEFAULT_BROKER.to_string();
+ if settings.contains_key("broker") {
+ broker = &settings["broker"];
+ }
+
+ let mut mqtt_client_id = &DEFAULT_CLIENT_ID.to_string();
+ if settings.contains_key("client_id") {
+ mqtt_client_id = &settings["client_id"];
+ }
+
+ let mqtt_create_opts = mqtt::CreateOptionsBuilder::new()
+ .server_uri(broker.to_string())
+ .client_id(mqtt_client_id.to_string())
+ .finalize();
+
+ let mqtt_client = mqtt::Client::new(mqtt_create_opts).unwrap_or_else(|err| {
+ println!("MQTT Client creation error: {:?}", err);
+ process::exit(1);
+ });
+
+ let mqtt_connection_opts = mqtt::ConnectOptionsBuilder::new()
+ .keep_alive_interval(Duration::from_secs(20))
+ .clean_session(true)
+ .finalize();
+
+ if let Err(e) = mqtt_client.connect(mqtt_connection_opts) {
+ println!("MQTT Client connection error: {:?}", e);
+ process::exit(1);
+ }
+
+ mqtt_client
+}
+
+fn publish_forecast(forecast: &Forecast, hourly: bool, root_topic: &str, client: &mqtt::Client) {
+ let mut forecast_topic = root_topic.to_string();
+ if hourly {
+ forecast_topic = format!("{forecast_topic}/hourly");
+ }
+ for period in forecast.properties.periods.iter().take(12) {
+ forecast_topic = format!("{forecast_topic}/{0}", period.number);
+ if !hourly {
+ publish_message_to(format!("{0}", period.name).as_str(),
+ format!("{forecast_topic}/name").as_str(),
+ client);
+ }
+ publish_message_to(format!("{0}", period.startTime).as_str(),
+ format!("{forecast_topic}/time").as_str(),
+ client);
+ publish_message_to(format!("{0} {1}", period.temperature, period.temperatureUnit).as_str(),
+ format!("{forecast_topic}/temperature").as_str(),
+ client);
+ publish_message_to(format!("{0}", period.windSpeed).as_str(),
+ format!("{forecast_topic}/windSpeed").as_str(),
+ client);
+ publish_message_to(format!("{0}", period.shortForecast).as_str(),
+ format!("{forecast_topic}/shortForecast").as_str(),
+ client);
+ }
+}
+
+fn publish_message_to(message: &str, topic: &str, client: &mqtt::Client) {
+ let message = mqtt::Message::new_retained(topic, message, QOS);
+ client.publish(message).unwrap_or_else(|err| {
+ println!("MQTT Client error publishing message: {:?}", err);
+ });
+}
+
+fn fetch_forecast(url: &str, user_agent: &str) -> Result<Forecast, Box<dyn std::error::Error>> {
+ let response = reqwest::blocking::Client::new()
+ .get(url)
+ .header(USER_AGENT, user_agent)
+ .send()?;
+
+ match response.status() {
+ reqwest::StatusCode::OK => Ok(response.json::<Forecast>()?),
+ reqwest::StatusCode::BAD_REQUEST => Err("Bad Request".into()),
+ status => Err(status.to_string().into()),
+ }
+}
+
+fn print_forecast(forecast: &Forecast) {
+ for period in forecast.properties.periods.iter() {
+ println!("{} {}:{} - Temp: {} {}, Wind: {}, {}", period.number, period.name, period.startTime, period.temperature, period.temperatureUnit, period.windSpeed, period.shortForecast);
+ }
+}
+
+fn fetch_alerts(url: &str, user_agent: &str) -> Result<Alerts, Box<dyn std::error::Error>> {
+ let response = reqwest::blocking::Client::new()
+ .get(url)
+ .header(USER_AGENT, user_agent)
+ .send()?;
+
+ match response.status() {
+ reqwest::StatusCode::OK => Ok(response.json::<Alerts>()?),
+ reqwest::StatusCode::BAD_REQUEST => Err("Bad Request".into()),
+ status => Err(status.to_string().into()),
+ }
+}
+
+fn print_alerts(alerts: &Alerts) {
+ if alerts.features.len() == 0 {
+ println!("No active alerts");
+ } else {
+ for feature in alerts.features.iter() {
+ println!("Event: {} {} - {:?} | {:?}", feature.properties.event, feature.properties.severity, feature.properties.onset, feature.properties.ends);
+ }
+ }
+}
+
diff --git a/src/weather_gov_json.rs b/src/weather_gov_json.rs
new file mode 100644
index 0000000..c25f11e
--- /dev/null
+++ b/src/weather_gov_json.rs
@@ -0,0 +1,169 @@
+
+/*!
+ * Weather.gov JSON Definitions
+ *
+ * Various structs to deserialize api responses
+ *
+ * Matt Kohls
+ * GPL v3
+ * 2023
+ */
+
+pub mod weather_gov_json {
+
+#![allow(non_snake_case)]
+
+use serde::Deserialize;
+
+/**
+ * Common Objects
+ **/
+
+#[derive(Deserialize, Debug)]
+pub struct Context {
+ #[serde(rename = "")]
+ pub link: String,
+ pub jsonld: JsonLD,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct JsonLD {
+ #[serde(rename = "@version")]
+ pub version: String,
+ pub wx: String,
+ #[serde(default)]
+ pub geo: String,
+ #[serde(default)]
+ pub unit: String,
+ #[serde(alias = "@vocab")]
+ pub vocab: String,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Geometry {
+ pub r#type: String,
+ pub coordinates: Vec<Vec<Vec<f32>>>,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct GeoCode {
+ pub SAME: Vec<String>,
+ pub UGC: Vec<String>,
+}
+
+
+/**
+ * Forecast
+ * https://www.weather.gov/documentation/services-web-api#/default/gridpoint_forecast
+ **/
+
+#[derive(Deserialize, Debug)]
+pub struct Forecast {
+ #[serde(rename = "@context")]
+ pub context: Context,
+ pub r#type: String,
+ pub geometry: Geometry,
+ pub properties: ForecastProperties,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct ForecastProperties {
+ pub updated: String,
+ pub units: String,
+ pub forecastGenerator: String,
+ pub generatedAt: String,
+ pub updateTime: String,
+ pub validTimes: String,
+ pub elevation: Elevation,
+ pub periods: Vec<Period>,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Elevation {
+ pub unitCode: String,
+ pub value: f32,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Period {
+ pub number: u32,
+ pub name: String,
+ pub startTime: String,
+ pub endTime: String,
+ pub isDaytime: bool,
+ pub temperature: i32,
+ pub temperatureUnit: String,
+ pub temperatureTrend: Option<String>,
+ pub windSpeed: String,
+ pub windDirection: String,
+ pub icon: String,
+ pub shortForecast: String,
+ pub detailedForecast: String,
+}
+
+/**
+ * Alerts
+ * https://www.weather.gov/documentation/services-web-api#/default/alerts_active
+ **/
+
+#[derive(Deserialize, Debug)]
+pub struct Alerts {
+ #[serde(rename = "@context", skip)]
+ pub context: String,
+ pub r#type: String,
+ pub features: Vec<Features>,
+ pub title: String,
+ pub updated: String,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Features {
+ pub id: String,
+ pub r#type: String,
+ pub geometry: Option<Geometry>,
+ pub properties: AlertProperties,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct AlertProperties {
+ #[serde(rename = "@id")]
+ pub aid: String,
+ #[serde(rename = "@type")]
+ pub r#type: String,
+ pub id: String,
+ pub areaDesc: String,
+ pub geocode: GeoCode,
+ pub affectedZones: Vec<String>,
+ pub references: Vec<AlertReferences>,
+ pub sent: String,
+ pub effective: String,
+ pub onset: Option<String>,
+ pub expires: String,
+ pub ends: Option<String>,
+ pub status: String,
+ pub messageType: String,
+ pub category: String,
+ pub severity: String,
+ pub certainty: String,
+ pub urgency: String,
+ pub event: String,
+ pub sender: String,
+ pub senderName: String,
+ pub headline: Option<String>,
+ pub description: String,
+ pub instruction: Option<String>,
+ pub response: String,
+ #[serde(skip)]
+ pub parameters: Vec<String>,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct AlertReferences {
+ #[serde(rename = "@id")]
+ pub id: String,
+ pub identifier: String,
+ pub sender: String,
+ pub sent: String,
+}
+
+} // End of module