Reconnect to mpd on unexpected response

This commit is contained in:
Sijmen 2023-12-23 06:52:55 +01:00
parent f05e17ee3a
commit b872019cb0
Signed by: vijfhoek
GPG Key ID: DAF7821E067D9C48
2 changed files with 66 additions and 36 deletions

View File

@ -31,7 +31,8 @@ async fn sse(_req: tide::Request<()>, sender: tide::sse::Sender) -> tide::Result
sender.send("playlist", "", None).await?;
sender.send("player", "", None).await?;
let mut mpd = mpd::Mpd::connect().await?;
let mut mpd = mpd::Mpd::new();
mpd.connect().await.unwrap();
loop {
let systems = mpd.idle(&["playlist", "player", "database"]).await?;

View File

@ -42,14 +42,18 @@ pub enum Entry {
#[derive(Debug)]
pub struct Mpd {
stream: TcpStream,
reader: BufReader<TcpStream>,
stream: Option<TcpStream>,
reader: Option<BufReader<TcpStream>>,
}
pub static INSTANCE: OnceLock<Mutex<Mpd>> = OnceLock::new();
pub async fn get_instance() -> MutexGuard<'static, Mpd> {
let instance = INSTANCE.get_or_init(|| Mutex::from(block_on(Mpd::connect()).unwrap()));
let instance = INSTANCE.get_or_init(|| {
let mut mpd = Mpd::new();
block_on(mpd.connect()).unwrap();
Mutex::from(mpd)
});
instance.lock().await
}
@ -107,39 +111,59 @@ impl Mpd {
s.replace('\"', "\\\"").replace('\'', "\\'")
}
pub async fn connect() -> anyhow::Result<Self> {
let stream = TcpStream::connect(host()).await?;
let reader = BufReader::new(stream.clone());
let mut this = Self { stream, reader };
pub fn new() -> Self {
Self {
stream: None,
reader: None,
}
}
pub async fn connect(&mut self) -> anyhow::Result<()> {
self.stream = Some(TcpStream::connect(host()).await?);
self.reader = Some(BufReader::new(self.stream.as_mut().unwrap().clone()));
// skip OK MPD line
// TODO check if it is indeed OK
let mut buffer = String::new();
this.reader.read_line(&mut buffer).await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
let password = std::env::var("MPD_PASSWORD").unwrap_or_default();
if !password.is_empty() {
let password = Self::escape_str(&password);
this.command(&format!(r#"password "{password}""#)).await?;
self.stream
.as_mut()
.unwrap()
.write_all(format!(r#"password "{password}"\n"#).as_bytes())
.await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
}
this.command("binarylimit 1048576").await?;
self.stream
.as_mut()
.unwrap()
.write_all("binarylimit 1048576\n".as_bytes())
.await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
Ok(this)
Ok(())
}
async fn read_binary_data(&mut self, size: usize) -> anyhow::Result<Vec<u8>> {
let mut binary = vec![0u8; size];
self.reader.read_exact(&mut binary).await?;
self.reader
.as_mut()
.unwrap()
.read_exact(&mut binary)
.await?;
let mut buffer = String::new();
// Skip the newline after the binary data
self.reader.read_line(&mut buffer).await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
// Skip the "OK" after the binary data
// TODO Check if actually OK
self.reader.read_line(&mut buffer).await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
Ok(binary)
}
@ -147,31 +171,36 @@ impl Mpd {
pub async fn command(&mut self, command: &str) -> anyhow::Result<CommandResult> {
let mut properties = Vec::new();
self.stream
.write_all(format!("{command}\n").as_bytes())
.await?;
'retry: loop {
self.stream
.as_mut()
.unwrap()
.write_all(format!("{command}\n").as_bytes())
.await?;
let mut buffer = String::new();
loop {
buffer.clear();
self.reader.read_line(&mut buffer).await?;
let mut buffer = String::new();
break 'retry (loop {
buffer.clear();
self.reader.as_mut().unwrap().read_line(&mut buffer).await?;
if let Some((key, value)) = buffer.split_once(": ") {
let value = value.trim_end();
properties.push((key.to_string(), value.to_string()));
if let Some((key, value)) = buffer.split_once(": ") {
let value = value.trim_end();
properties.push((key.to_string(), value.to_string()));
if key == "binary" {
let binary = self.read_binary_data(value.parse()?).await?;
break Ok(CommandResult::new_binary(properties, binary));
if key == "binary" {
let binary = self.read_binary_data(value.parse()?).await?;
break Ok(CommandResult::new_binary(properties, binary));
}
} else if buffer.starts_with("OK") {
break Ok(CommandResult::new(properties));
} else if buffer.starts_with("ACK") {
break Err(anyhow!(buffer));
} else {
println!("Unexpected MPD response '{buffer}'");
self.connect().await.unwrap();
continue 'retry;
}
} else if buffer.starts_with("OK") {
break Ok(CommandResult::new(properties));
} else if buffer.starts_with("ACK") {
break Err(anyhow!(buffer));
} else {
println!("Unexpected MPD response '{buffer}'");
break Err(anyhow!(buffer));
}
});
}
}