Convert to actix
continuous-integration/drone/push Build was killed Details

This commit is contained in:
Sijmen 2023-12-26 17:22:46 +01:00
parent 8a01102302
commit 992f286c0a
13 changed files with 1095 additions and 1430 deletions

2131
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,13 +9,17 @@ repository = "https://github.com/vijfhoek/empede"
[dependencies] [dependencies]
anyhow = "1.0.70" anyhow = "1.0.70"
askama = { version = "0.12.0", default-features = false, features = ["serde-json"] } askama = { version = "0.12.0", default-features = false, features = ["serde-json"] }
askama_tide = "0.15.0"
async-std = { version = "1.12.0", features = ["attributes"] }
infer = { version = "0.15.0", default-features = false } infer = { version = "0.15.0", default-features = false }
percent-encoding = "2.2.0" percent-encoding = "2.2.0"
serde = { version = "1.0.160", features = ["derive"] } serde = { version = "1.0.160", features = ["derive"] }
serde_qs = "0.12.0" serde_qs = "0.12.0"
tide = "0.16.0" askama_actix = "0.14.0"
tide-tracing = "0.0.12" tokio = { version = "1.35.1", features = ["full"] }
tracing = { version = "0.1.37", default-features = false, features = ["std"] } actix-web = "4.4.0"
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["std", "fmt"] } thiserror = "1.0.51"
actix-files = "0.6.2"
actix-web-lab = "0.20.1"
tokio-stream = "0.1.14"
futures = "0.3.29"
async-stream = "0.3.5"
env_logger = "0.10.1"

View File

@ -1,43 +1,43 @@
use actix_web::{middleware::Logger, web, App, HttpServer};
mod crate_version; mod crate_version;
mod mpd; mod mpd;
mod routes; mod routes;
#[async_std::main] #[actix_web::main]
async fn main() -> tide::Result<()> { async fn main() -> std::io::Result<()> {
tracing_subscriber::fmt() let bind = std::env::var("EMPEDE_BIND").unwrap_or("0.0.0.0:8080".into());
.with_max_level(tracing::Level::WARN) let (host, port) = bind.split_once(':').unwrap();
.init();
let mut app = tide::new(); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
app.with(tide_tracing::TraceMiddleware::new());
app.at("/").get(routes::index::get_index); HttpServer::new(|| {
app.at("/player").get(routes::player::get_player); App::new().wrap(Logger::default()).service(
app.at("/browser").get(routes::browser::get_browser); web::scope("")
app.at("/art").get(routes::art::get_art); .service(routes::index::get_index)
.service(routes::player::get_player)
app.at("/sse").get(tide::sse::endpoint(routes::sse::sse)); .service(routes::browser::get_browser)
.service(routes::art::get_art)
app.at("/queue").get(routes::queue::get_queue); .service(routes::sse::idle)
app.at("/queue").post(routes::queue::post_queue); .service(routes::queue::get_queue)
app.at("/queue").delete(routes::queue::delete_queue); .service(routes::queue::post_queue)
app.at("/queue/move").post(routes::queue::post_queue_move); .service(routes::queue::delete_queue)
.service(routes::queue::post_queue_move)
app.at("/play").post(routes::controls::post_play); .service(routes::controls::post_play)
app.at("/pause").post(routes::controls::post_pause); .service(routes::controls::post_pause)
app.at("/previous").post(routes::controls::post_previous); .service(routes::controls::post_previous)
app.at("/next").post(routes::controls::post_next); .service(routes::controls::post_next)
.service(routes::controls::post_consume)
app.at("/consume").post(routes::controls::post_consume); .service(routes::controls::post_random)
app.at("/random").post(routes::controls::post_random); .service(routes::controls::post_repeat)
app.at("/repeat").post(routes::controls::post_repeat); .service(routes::controls::post_single)
app.at("/single").post(routes::controls::post_single); .service(routes::controls::post_shuffle)
app.at("/shuffle").post(routes::controls::post_shuffle); .service(actix_files::Files::new("/static", "./static")),
)
app.at("/static").serve_dir("static/")?; })
.bind((host, port.parse().unwrap()))?
let bind = std::env::var("EMPEDE_BIND").unwrap_or("0.0.0.0:8080".to_string()); .run()
app.listen(bind).await?; .await?;
Ok(()) Ok(())
} }

View File

@ -1,11 +1,10 @@
use std::{collections::HashMap, sync::OnceLock}; use std::collections::HashMap;
use anyhow::anyhow; use anyhow::anyhow;
use async_std::{ use tokio::{
io::{prelude::BufReadExt, BufReader, ReadExt, WriteExt}, io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream},
net::TcpStream, net::TcpStream,
sync::{Mutex, MutexGuard}, sync::{Mutex, MutexGuard, OnceCell},
task::block_on,
}; };
pub fn host() -> String { pub fn host() -> String {
@ -42,19 +41,21 @@ pub enum Entry {
#[derive(Debug)] #[derive(Debug)]
pub struct Mpd { pub struct Mpd {
stream: Option<TcpStream>, bufstream: Option<BufStream<TcpStream>>,
reader: Option<BufReader<TcpStream>>,
} }
pub static INSTANCE: OnceLock<Mutex<Mpd>> = OnceLock::new(); pub static INSTANCE: OnceCell<Mutex<Mpd>> = OnceCell::const_new();
pub async fn get_instance() -> MutexGuard<'static, Mpd> { pub async fn get_instance() -> MutexGuard<'static, Mpd> {
let instance = INSTANCE.get_or_init(|| { INSTANCE
let mut mpd = Mpd::new(); .get_or_init(|| async {
block_on(mpd.connect()).unwrap(); let mut mpd = Mpd::new();
Mutex::from(mpd) mpd.connect().await.unwrap();
}); Mutex::from(mpd)
instance.lock().await })
.await
.lock()
.await
} }
pub async fn command(command: &str) -> anyhow::Result<CommandResult> { pub async fn command(command: &str) -> anyhow::Result<CommandResult> {
@ -116,45 +117,42 @@ impl Mpd {
} }
pub fn new() -> Self { pub fn new() -> Self {
Self { Self { bufstream: None }
stream: None,
reader: None,
}
} }
pub async fn connect(&mut self) -> anyhow::Result<()> { pub async fn connect(&mut self) -> anyhow::Result<()> {
self.stream = Some(TcpStream::connect(host()).await?); let stream = TcpStream::connect(host()).await?;
self.reader = Some(BufReader::new(self.stream.as_mut().unwrap().clone())); let mut bufstream = BufStream::new(stream);
// skip OK MPD line // skip OK MPD line
// TODO check if it is indeed OK // TODO check if it is indeed OK
let mut buffer = String::new(); let mut buffer = String::new();
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; bufstream.read_line(&mut buffer).await?;
let password = std::env::var("MPD_PASSWORD").unwrap_or_default(); let password = std::env::var("MPD_PASSWORD").unwrap_or_default();
if !password.is_empty() { if !password.is_empty() {
let password = Self::escape_str(&password); let password = Self::escape_str(&password);
self.stream bufstream
.as_mut() .write_all(format!("password \"{password}\"\n").as_bytes())
.unwrap()
.write_all(format!(r#"password "{password}"\n"#).as_bytes())
.await?; .await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; bufstream.flush().await?;
bufstream.read_line(&mut buffer).await?;
} }
self.stream bufstream
.as_mut()
.unwrap()
.write_all("binarylimit 1048576\n".as_bytes()) .write_all("binarylimit 1048576\n".as_bytes())
.await?; .await?;
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; bufstream.flush().await?;
bufstream.read_line(&mut buffer).await?;
self.bufstream = Some(bufstream);
Ok(()) Ok(())
} }
async fn read_binary_data(&mut self, size: usize) -> anyhow::Result<Vec<u8>> { async fn read_binary_data(&mut self, size: usize) -> anyhow::Result<Vec<u8>> {
let mut binary = vec![0u8; size]; let mut binary = vec![0u8; size];
self.reader self.bufstream
.as_mut() .as_mut()
.unwrap() .unwrap()
.read_exact(&mut binary) .read_exact(&mut binary)
@ -163,11 +161,19 @@ impl Mpd {
let mut buffer = String::new(); let mut buffer = String::new();
// Skip the newline after the binary data // Skip the newline after the binary data
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; self.bufstream
.as_mut()
.unwrap()
.read_line(&mut buffer)
.await?;
// Skip the "OK" after the binary data // Skip the "OK" after the binary data
// TODO Check if actually OK // TODO Check if actually OK
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; self.bufstream
.as_mut()
.unwrap()
.read_line(&mut buffer)
.await?;
Ok(binary) Ok(binary)
} }
@ -176,16 +182,21 @@ impl Mpd {
let mut properties = Vec::new(); let mut properties = Vec::new();
'retry: loop { 'retry: loop {
self.stream self.bufstream
.as_mut() .as_mut()
.unwrap() .unwrap()
.write_all(format!("{command}\n").as_bytes()) .write_all(format!("{command}\n").as_bytes())
.await?; .await?;
self.bufstream.as_mut().unwrap().flush().await?;
let mut buffer = String::new(); let mut buffer = String::new();
break 'retry (loop { break 'retry (loop {
buffer.clear(); buffer.clear();
self.reader.as_mut().unwrap().read_line(&mut buffer).await?; self.bufstream
.as_mut()
.unwrap()
.read_line(&mut buffer)
.await?;
if let Some((key, value)) = buffer.split_once(": ") { if let Some((key, value)) = buffer.split_once(": ") {
let value = value.trim_end(); let value = value.trim_end();

View File

@ -1,4 +1,9 @@
use crate::mpd; use crate::mpd;
use actix_web::{
get,
http::header::{self, CacheDirective},
web, HttpResponse, Responder,
};
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use serde::Deserialize; use serde::Deserialize;
@ -8,33 +13,30 @@ struct ArtQuery {
path: String, path: String,
} }
pub async fn get_art(req: tide::Request<()>) -> tide::Result { #[get("/art")]
let query: ArtQuery = req.query()?; pub async fn get_art(query: web::Query<ArtQuery>) -> impl Responder {
let path = percent_decode_str(&query.path).decode_utf8_lossy(); let path = percent_decode_str(&query.path).decode_utf8_lossy();
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
let resp = if let Ok(art) = mpd.albumart(&path).await { if let Ok(art) = mpd.albumart(&path).await {
let mime = infer::get(&art) let mime = infer::get(&art)
.map(|k| k.mime_type()) .map(|k| k.mime_type())
.unwrap_or("application/octet-stream"); .unwrap_or("application/octet-stream");
tide::Response::builder(tide::StatusCode::Ok) HttpResponse::Ok()
.body(art)
.content_type(mime) .content_type(mime)
.header("cache-control", "max-age=3600") .append_header(header::CacheControl(vec![CacheDirective::MaxAge(3600)]))
.body(art)
} else if let Ok(art) = mpd.readpicture(&path).await { } else if let Ok(art) = mpd.readpicture(&path).await {
let mime = infer::get(&art) let mime = infer::get(&art)
.map(|k| k.mime_type()) .map(|k| k.mime_type())
.unwrap_or("application/octet-stream"); .unwrap_or("application/octet-stream");
tide::Response::builder(tide::StatusCode::Ok) HttpResponse::Ok()
.body(art)
.content_type(mime) .content_type(mime)
.header("cache-control", "max-age=3600") .append_header(header::CacheControl(vec![CacheDirective::MaxAge(3600)]))
.body(art)
} else { } else {
tide::Response::builder(tide::StatusCode::NotFound) HttpResponse::NotFound().finish()
}; }
Ok(resp.into())
} }

View File

@ -1,4 +1,5 @@
use crate::mpd; use crate::mpd;
use actix_web::{get, web, Responder};
use askama::Template; use askama::Template;
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use serde::Deserialize; use serde::Deserialize;
@ -17,19 +18,17 @@ struct BrowserQuery {
path: String, path: String,
} }
pub async fn get_browser(req: tide::Request<()>) -> tide::Result { #[get("/browser")]
let query: BrowserQuery = req.query()?; pub async fn get_browser(query: web::Query<BrowserQuery>) -> impl Responder {
let path = percent_decode_str(&query.path).decode_utf8_lossy(); let path = percent_decode_str(&query.path).decode_utf8_lossy();
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
let entries = mpd.ls(&path).await?; let entries = mpd.ls(&path).await.unwrap();
let template = BrowserTemplate { BrowserTemplate {
path: Path::new(&*path) path: Path::new(&*path)
.iter() .iter()
.map(|s| s.to_string_lossy().to_string()) .map(|s| s.to_string_lossy().to_string())
.collect(), .collect(),
entries, entries,
}; }
Ok(template.into())
} }

View File

@ -1,3 +1,5 @@
use actix_web::{post, HttpResponse, Responder};
use crate::mpd; use crate::mpd;
async fn toggle_setting(setting: &str) -> anyhow::Result<()> { async fn toggle_setting(setting: &str) -> anyhow::Result<()> {
@ -11,47 +13,56 @@ async fn toggle_setting(setting: &str) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub async fn post_play(_req: tide::Request<()>) -> tide::Result { #[post("/play")]
mpd::command("play").await?; pub async fn post_play() -> impl Responder {
Ok("".into()) mpd::command("play").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_pause(_req: tide::Request<()>) -> tide::Result { #[post("/pause")]
mpd::command("pause 1").await?; pub async fn post_pause() -> impl Responder {
Ok("".into()) mpd::command("pause 1").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_previous(_req: tide::Request<()>) -> tide::Result { #[post("/previous")]
mpd::command("previous").await?; pub async fn post_previous() -> impl Responder {
Ok("".into()) mpd::command("previous").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_next(_req: tide::Request<()>) -> tide::Result { #[post("/next")]
mpd::command("next").await?; pub async fn post_next() -> impl Responder {
Ok("".into()) mpd::command("next").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_consume(_req: tide::Request<()>) -> tide::Result { #[post("/consume")]
toggle_setting("consume").await?; pub async fn post_consume() -> impl Responder {
Ok("".into()) toggle_setting("consume").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_random(_req: tide::Request<()>) -> tide::Result { #[post("/random")]
toggle_setting("random").await?; pub async fn post_random() -> impl Responder {
Ok("".into()) toggle_setting("random").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_repeat(_req: tide::Request<()>) -> tide::Result { #[post("/repeat")]
toggle_setting("repeat").await?; pub async fn post_repeat() -> impl Responder {
Ok("".into()) toggle_setting("repeat").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_shuffle(_req: tide::Request<()>) -> tide::Result { #[post("/shuffle")]
mpd::command("shuffle").await?; pub async fn post_shuffle() -> impl Responder {
Ok("".into()) mpd::command("shuffle").await.unwrap();
HttpResponse::NoContent()
} }
pub async fn post_single(_req: tide::Request<()>) -> tide::Result { #[post("/single")]
toggle_setting("single").await?; pub async fn post_single() -> impl Responder {
Ok("".into()) toggle_setting("single").await.unwrap();
} HttpResponse::NoContent()
}

View File

@ -1,4 +1,5 @@
use crate::crate_version; use crate::crate_version;
use actix_web::{get, Responder};
use askama::Template; use askama::Template;
use serde::Deserialize; use serde::Deserialize;
@ -12,6 +13,7 @@ struct IndexQuery {
path: String, path: String,
} }
pub async fn get_index(_req: tide::Request<()>) -> tide::Result { #[get("/")]
Ok(askama_tide::into_response(&IndexTemplate)) pub async fn get_index() -> impl Responder {
IndexTemplate
} }

View File

@ -1,7 +1,7 @@
pub mod art; pub mod art;
pub mod browser; pub mod browser;
pub mod controls;
pub mod index; pub mod index;
pub mod player; pub mod player;
pub mod queue; pub mod queue;
pub mod controls;
pub mod sse; pub mod sse;

View File

@ -1,13 +1,14 @@
use crate::mpd; use crate::mpd;
use actix_web::{get, Responder};
use askama::Template; use askama::Template;
use std::collections::HashMap; use std::collections::HashMap;
#[derive(Template)] #[derive(Template)]
#[template(path = "player.html")] #[template(path = "player.html")]
struct PlayerTemplate<'a> { struct PlayerTemplate {
song: Option<&'a HashMap<String, String>>, song: Option<HashMap<String, String>>,
name: Option<String>, name: Option<String>,
state: &'a str, state: String,
consume: bool, consume: bool,
random: bool, random: bool,
repeat: bool, repeat: bool,
@ -16,10 +17,11 @@ struct PlayerTemplate<'a> {
duration: f32, duration: f32,
} }
pub async fn get_player(_req: tide::Request<()>) -> tide::Result { #[get("/player")]
pub async fn get_player() -> impl Responder {
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
let song = mpd.command("currentsong").await?.into_hashmap(); let song = mpd.command("currentsong").await.unwrap().into_hashmap();
let status = mpd.command("status").await?.into_hashmap(); let status = mpd.command("status").await.unwrap().into_hashmap();
let elapsed = status let elapsed = status
.get("elapsed") .get("elapsed")
@ -31,9 +33,13 @@ pub async fn get_player(_req: tide::Request<()>) -> tide::Result {
.unwrap_or(1.0); .unwrap_or(1.0);
let mut template = PlayerTemplate { let mut template = PlayerTemplate {
song: if song.is_empty() { None } else { Some(&song) }, song: if song.is_empty() {
None
} else {
Some(song.clone())
},
name: None, name: None,
state: &status["state"], state: status["state"].clone(),
consume: status["consume"] == "1", consume: status["consume"] == "1",
random: status["random"] == "1", random: status["random"] == "1",
repeat: status["repeat"] == "1", repeat: status["repeat"] == "1",
@ -47,5 +53,5 @@ pub async fn get_player(_req: tide::Request<()>) -> tide::Result {
template.name = Some(name); template.name = Some(name);
} }
Ok(template.into()) template
} }

View File

@ -1,4 +1,5 @@
use crate::mpd; use crate::mpd;
use actix_web::{delete, get, post, web, HttpResponse, Responder};
use askama::Template; use askama::Template;
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use serde::Deserialize; use serde::Deserialize;
@ -9,11 +10,11 @@ struct QueueTemplate {
queue: Vec<mpd::QueueItem>, queue: Vec<mpd::QueueItem>,
} }
pub async fn get_queue(_req: tide::Request<()>) -> tide::Result { #[get("/queue")]
pub async fn get_queue() -> impl Responder {
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
let queue = mpd.playlist().await?; let queue = mpd.playlist().await.unwrap();
let template = QueueTemplate { queue }; QueueTemplate { queue }
Ok(template.into())
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -27,26 +28,26 @@ struct PostQueueQuery {
play: bool, play: bool,
} }
pub async fn post_queue(req: tide::Request<()>) -> tide::Result { #[post("/queue")]
let query: PostQueueQuery = req.query()?; pub async fn post_queue(query: web::Query<PostQueueQuery>) -> impl Responder {
let path = percent_decode_str(&query.path).decode_utf8_lossy(); let path = percent_decode_str(&query.path).decode_utf8_lossy();
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
if query.replace { if query.replace {
mpd.clear().await?; mpd.clear().await.unwrap();
} }
if query.next { if query.next {
mpd.add_pos(&path, "+0").await?; mpd.add_pos(&path, "+0").await.unwrap();
} else { } else {
mpd.add(&path).await?; mpd.add(&path).await.unwrap();
} }
if query.play { if query.play {
mpd.play().await?; mpd.play().await.unwrap();
} }
Ok("".into()) HttpResponse::NoContent()
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -55,17 +56,16 @@ struct DeleteQueueQuery {
id: Option<u32>, id: Option<u32>,
} }
pub async fn delete_queue(req: tide::Request<()>) -> tide::Result { #[delete("/queue")]
let query: DeleteQueueQuery = req.query()?; pub async fn delete_queue(query: web::Query<DeleteQueueQuery>) -> impl Responder {
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
if let Some(id) = query.id { if let Some(id) = query.id {
mpd.command(&format!("deleteid {id}")).await?; mpd.command(&format!("deleteid {id}")).await.unwrap();
} else { } else {
mpd.command("clear").await?; mpd.command("clear").await.unwrap();
} }
Ok("".into()) HttpResponse::NoContent()
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -74,10 +74,11 @@ struct UpdateQueueBody {
to: u32, to: u32,
} }
pub async fn post_queue_move(mut req: tide::Request<()>) -> tide::Result { #[post("/queue/move")]
let body: UpdateQueueBody = req.body_json().await?; pub async fn post_queue_move(body: web::Json<UpdateQueueBody>) -> impl Responder {
let mut mpd = mpd::get_instance().await; let mut mpd = mpd::get_instance().await;
mpd.command(&format!("move {} {}", body.from, body.to)) mpd.command(&format!("move {} {}", body.from, body.to))
.await?; .await
Ok("".into()) .unwrap();
HttpResponse::NoContent()
} }

View File

@ -1,19 +1,33 @@
use crate::mpd; use std::time::Duration;
pub async fn sse(_req: tide::Request<()>, sender: tide::sse::Sender) -> tide::Result<()> { use actix_web::{get, Responder};
// Update everything on connect use actix_web_lab::sse;
sender.send("playlist", "", None).await?;
sender.send("player", "", None).await?;
let mut mpd = mpd::Mpd::new(); use crate::mpd::Mpd;
#[get("/idle")]
pub async fn idle() -> impl Responder {
let mut mpd = Mpd::new();
mpd.connect().await.unwrap(); mpd.connect().await.unwrap();
loop { const SYSTEMS: &[&str] = &["playlist", "player", "database", "options"];
let systems = mpd
.idle(&["playlist", "player", "database", "options"]) let (tx, rx) = tokio::sync::mpsc::channel(10);
.await?; for system in SYSTEMS {
for system in systems { _ = tx
sender.send(&system, "", None).await?; .send(sse::Data::new("").event(system.to_owned()).into())
} .await;
} }
}
actix_web::rt::spawn(async move {
loop {
let systems = mpd.idle(SYSTEMS).await.unwrap();
for system in systems {
_ = tx.send(sse::Data::new("").event(system).into()).await;
}
}
});
sse::Sse::from_infallible_receiver(rx).with_retry_duration(Duration::from_secs(10))
}

View File

@ -24,7 +24,7 @@
</script> </script>
</head> </head>
<body hx-ext="sse" sse-connect="/sse"> <body hx-ext="sse" sse-connect="/idle">
<div <div
class="browser" class="browser"
hx-trigger="load,sse:database" hx-trigger="load,sse:database"