diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a5478ba --- /dev/null +++ b/src/main.rs @@ -0,0 +1,210 @@ + +/*! + * Weather.gov Data Aggregator + * + * + * Matt Kohls + * GPL v3 + * 2023 + */ + +use std::{ + process, + time::Duration, + collections::HashMap +}; + +use reqwest::header::USER_AGENT; +use config::Config; + +extern crate paho_mqtt as mqtt; + +mod weather_gov_json; +use crate::weather_gov_json::weather_gov_json::{ + Forecast, + Alerts, +}; + +const DEFAULT_BROKER:&str = "tcp://localhost:1883"; +const DEFAULT_CLIENT_ID:&str = "weather-data-aggregator"; +const DEFAULT_ROOT_FORECAST_TOPIC:&str = "forecast"; +const QOS:i32 = 0; + +const DEFAULT_USER_AGENT_STRING:&str = "Mozilla/5.0 (X11; Linux x86_64; rv:104.0) Gecko/20100101 Firefox/104.0"; +const GRIDPOINTS_PATH:&str = "https://api.weather.gov/gridpoints/"; +const ACTIVE_ALERTS_PATH:&str = "https://api.weather.gov/alerts/active?status=actual&zone="; + +fn main() { + + let settings = Config::builder() + .add_source(config::File::with_name("config")) + .build() + .unwrap() + .try_deserialize::<HashMap<String, String>>() + .unwrap_or_else(|err| { + println!("Error reading config file: {:?}", err); + process::exit(1); + }); + + let mqtt_client = create_mqtt_client(&settings); + + let mut root_forecast_topic = &DEFAULT_ROOT_FORECAST_TOPIC.to_string(); + if settings.contains_key("root_forecast_topic") { + root_forecast_topic = &settings["root_forecast_topic"]; + } + + let mut user_agent = &DEFAULT_USER_AGENT_STRING.to_string(); + if settings.contains_key("user_agent") { + user_agent = &settings["user_agent"]; + } + + let mut active_alert_url = ACTIVE_ALERTS_PATH.to_string(); + if settings.contains_key("zone_id") { + active_alert_url.push_str(&settings["zone_id"]); + + let alert_response = fetch_alerts(&active_alert_url, &user_agent); + match alert_response { + Err(err) => println!("Error fetching alerts: {} - {:?}", active_alert_url, err), + Ok(alerts) => print_alerts(&alerts), + } + } else { + println!("No zone_id, skipping active alert lookup & publish") + } + + let mut forecast_url = GRIDPOINTS_PATH.to_string(); + if settings.contains_key("forecast_office") && settings.contains_key("grid_x") && settings.contains_key("grid_y") { + forecast_url.push_str(&format!("{}/{},{}/forecast", &settings["forecast_office"], &settings["grid_x"], &settings["grid_y"])); + + let daily_response = fetch_forecast(&forecast_url, &user_agent); + match daily_response { + Err(err) => println!("Error fetching forecast: {} - {:?}", forecast_url, err), + Ok(forecast) => publish_forecast(&forecast, false, root_forecast_topic, &mqtt_client), + } + + forecast_url.push_str("/hourly"); + let hourly_response = fetch_forecast(&forecast_url, &user_agent); + match hourly_response { + Err(err) => println!("Error fetching forecast: {} - {:?}", forecast_url, err), + Ok(forecast) => print_forecast(&forecast), + } + } else { + println!("No (or missing one of) the following: forecast_office, grid_x, grid_y, skipping forecast lookup & publish"); + } + + let _ = mqtt_client.disconnect_after(Duration::from_secs(20)); +} + +/** + * Creates a new MQTT client that is connected to the broker + * + **/ +fn create_mqtt_client(settings: &HashMap<String, String>) -> mqtt::Client { + let mut broker = &DEFAULT_BROKER.to_string(); + if settings.contains_key("broker") { + broker = &settings["broker"]; + } + + let mut mqtt_client_id = &DEFAULT_CLIENT_ID.to_string(); + if settings.contains_key("client_id") { + mqtt_client_id = &settings["client_id"]; + } + + let mqtt_create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker.to_string()) + .client_id(mqtt_client_id.to_string()) + .finalize(); + + let mqtt_client = mqtt::Client::new(mqtt_create_opts).unwrap_or_else(|err| { + println!("MQTT Client creation error: {:?}", err); + process::exit(1); + }); + + let mqtt_connection_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(true) + .finalize(); + + if let Err(e) = mqtt_client.connect(mqtt_connection_opts) { + println!("MQTT Client connection error: {:?}", e); + process::exit(1); + } + + mqtt_client +} + +fn publish_forecast(forecast: &Forecast, hourly: bool, root_topic: &str, client: &mqtt::Client) { + let mut forecast_topic = root_topic.to_string(); + if hourly { + forecast_topic = format!("{forecast_topic}/hourly"); + } + for period in forecast.properties.periods.iter().take(12) { + forecast_topic = format!("{forecast_topic}/{0}", period.number); + if !hourly { + publish_message_to(format!("{0}", period.name).as_str(), + format!("{forecast_topic}/name").as_str(), + client); + } + publish_message_to(format!("{0}", period.startTime).as_str(), + format!("{forecast_topic}/time").as_str(), + client); + publish_message_to(format!("{0} {1}", period.temperature, period.temperatureUnit).as_str(), + format!("{forecast_topic}/temperature").as_str(), + client); + publish_message_to(format!("{0}", period.windSpeed).as_str(), + format!("{forecast_topic}/windSpeed").as_str(), + client); + publish_message_to(format!("{0}", period.shortForecast).as_str(), + format!("{forecast_topic}/shortForecast").as_str(), + client); + } +} + +fn publish_message_to(message: &str, topic: &str, client: &mqtt::Client) { + let message = mqtt::Message::new_retained(topic, message, QOS); + client.publish(message).unwrap_or_else(|err| { + println!("MQTT Client error publishing message: {:?}", err); + }); +} + +fn fetch_forecast(url: &str, user_agent: &str) -> Result<Forecast, Box<dyn std::error::Error>> { + let response = reqwest::blocking::Client::new() + .get(url) + .header(USER_AGENT, user_agent) + .send()?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::<Forecast>()?), + reqwest::StatusCode::BAD_REQUEST => Err("Bad Request".into()), + status => Err(status.to_string().into()), + } +} + +fn print_forecast(forecast: &Forecast) { + for period in forecast.properties.periods.iter() { + println!("{} {}:{} - Temp: {} {}, Wind: {}, {}", period.number, period.name, period.startTime, period.temperature, period.temperatureUnit, period.windSpeed, period.shortForecast); + } +} + +fn fetch_alerts(url: &str, user_agent: &str) -> Result<Alerts, Box<dyn std::error::Error>> { + let response = reqwest::blocking::Client::new() + .get(url) + .header(USER_AGENT, user_agent) + .send()?; + + match response.status() { + reqwest::StatusCode::OK => Ok(response.json::<Alerts>()?), + reqwest::StatusCode::BAD_REQUEST => Err("Bad Request".into()), + status => Err(status.to_string().into()), + } +} + +fn print_alerts(alerts: &Alerts) { + if alerts.features.len() == 0 { + println!("No active alerts"); + } else { + for feature in alerts.features.iter() { + println!("Event: {} {} - {:?} | {:?}", feature.properties.event, feature.properties.severity, feature.properties.onset, feature.properties.ends); + } + } +} + |