From b872019cb03be721289ac4adc7664808b923bc90 Mon Sep 17 00:00:00 2001 From: Sijmen Date: Sat, 23 Dec 2023 06:52:55 +0100 Subject: [PATCH] Reconnect to mpd on unexpected response --- src/main.rs | 3 +- src/mpd.rs | 99 ++++++++++++++++++++++++++++++++++------------------- 2 files changed, 66 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index 355dfb5..336e4a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,8 @@ async fn sse(_req: tide::Request<()>, sender: tide::sse::Sender) -> tide::Result sender.send("playlist", "", None).await?; sender.send("player", "", None).await?; - let mut mpd = mpd::Mpd::connect().await?; + let mut mpd = mpd::Mpd::new(); + mpd.connect().await.unwrap(); loop { let systems = mpd.idle(&["playlist", "player", "database"]).await?; diff --git a/src/mpd.rs b/src/mpd.rs index a2fdd10..5378980 100644 --- a/src/mpd.rs +++ b/src/mpd.rs @@ -42,14 +42,18 @@ pub enum Entry { #[derive(Debug)] pub struct Mpd { - stream: TcpStream, - reader: BufReader, + stream: Option, + reader: Option>, } pub static INSTANCE: OnceLock> = OnceLock::new(); pub async fn get_instance() -> MutexGuard<'static, Mpd> { - let instance = INSTANCE.get_or_init(|| Mutex::from(block_on(Mpd::connect()).unwrap())); + let instance = INSTANCE.get_or_init(|| { + let mut mpd = Mpd::new(); + block_on(mpd.connect()).unwrap(); + Mutex::from(mpd) + }); instance.lock().await } @@ -107,39 +111,59 @@ impl Mpd { s.replace('\"', "\\\"").replace('\'', "\\'") } - pub async fn connect() -> anyhow::Result { - let stream = TcpStream::connect(host()).await?; - let reader = BufReader::new(stream.clone()); - let mut this = Self { stream, reader }; + pub fn new() -> Self { + Self { + stream: None, + reader: None, + } + } + + pub async fn connect(&mut self) -> anyhow::Result<()> { + self.stream = Some(TcpStream::connect(host()).await?); + self.reader = Some(BufReader::new(self.stream.as_mut().unwrap().clone())); // skip OK MPD line // TODO check if it is indeed OK let mut buffer = String::new(); - this.reader.read_line(&mut buffer).await?; + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; let password = std::env::var("MPD_PASSWORD").unwrap_or_default(); if !password.is_empty() { let password = Self::escape_str(&password); - this.command(&format!(r#"password "{password}""#)).await?; + self.stream + .as_mut() + .unwrap() + .write_all(format!(r#"password "{password}"\n"#).as_bytes()) + .await?; + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; } - this.command("binarylimit 1048576").await?; + self.stream + .as_mut() + .unwrap() + .write_all("binarylimit 1048576\n".as_bytes()) + .await?; + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; - Ok(this) + Ok(()) } async fn read_binary_data(&mut self, size: usize) -> anyhow::Result> { let mut binary = vec![0u8; size]; - self.reader.read_exact(&mut binary).await?; + self.reader + .as_mut() + .unwrap() + .read_exact(&mut binary) + .await?; let mut buffer = String::new(); // Skip the newline after the binary data - self.reader.read_line(&mut buffer).await?; + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; // Skip the "OK" after the binary data // TODO Check if actually OK - self.reader.read_line(&mut buffer).await?; + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; Ok(binary) } @@ -147,31 +171,36 @@ impl Mpd { pub async fn command(&mut self, command: &str) -> anyhow::Result { let mut properties = Vec::new(); - self.stream - .write_all(format!("{command}\n").as_bytes()) - .await?; + 'retry: loop { + self.stream + .as_mut() + .unwrap() + .write_all(format!("{command}\n").as_bytes()) + .await?; - let mut buffer = String::new(); - loop { - buffer.clear(); - self.reader.read_line(&mut buffer).await?; + let mut buffer = String::new(); + break 'retry (loop { + buffer.clear(); + self.reader.as_mut().unwrap().read_line(&mut buffer).await?; - if let Some((key, value)) = buffer.split_once(": ") { - let value = value.trim_end(); - properties.push((key.to_string(), value.to_string())); + if let Some((key, value)) = buffer.split_once(": ") { + let value = value.trim_end(); + properties.push((key.to_string(), value.to_string())); - if key == "binary" { - let binary = self.read_binary_data(value.parse()?).await?; - break Ok(CommandResult::new_binary(properties, binary)); + if key == "binary" { + let binary = self.read_binary_data(value.parse()?).await?; + break Ok(CommandResult::new_binary(properties, binary)); + } + } else if buffer.starts_with("OK") { + break Ok(CommandResult::new(properties)); + } else if buffer.starts_with("ACK") { + break Err(anyhow!(buffer)); + } else { + println!("Unexpected MPD response '{buffer}'"); + self.connect().await.unwrap(); + continue 'retry; } - } else if buffer.starts_with("OK") { - break Ok(CommandResult::new(properties)); - } else if buffer.starts_with("ACK") { - break Err(anyhow!(buffer)); - } else { - println!("Unexpected MPD response '{buffer}'"); - break Err(anyhow!(buffer)); - } + }); } }