//! IPC client wrapper for the launcher UI use anyhow::{Context, Result}; use shepherd_api::{Command, Event, ReasonCode, Response, ResponsePayload, ResponseResult}; use shepherd_ipc::IpcClient; use shepherd_util::EntryId; use std::path::Path; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; use tracing::{debug, error, info, warn}; use crate::state::{LauncherState, SharedState}; /// Messages from UI to client task #[derive(Debug)] pub enum ClientCommand { /// Request to launch an entry Launch(EntryId), /// Request to stop current session StopCurrent, /// Request fresh state RefreshState, /// Shutdown the client Shutdown, } /// Client connection manager pub struct DaemonClient { socket_path: std::path::PathBuf, state: SharedState, command_rx: mpsc::UnboundedReceiver, } impl DaemonClient { pub fn new( socket_path: impl AsRef, state: SharedState, command_rx: mpsc::UnboundedReceiver, ) -> Self { Self { socket_path: socket_path.as_ref().to_path_buf(), state, command_rx, } } /// Run the client connection loop pub async fn run(mut self) { loop { match self.connect_and_run().await { Ok(()) => { info!("Client loop exited normally"); break; } Err(e) => { error!(error = %e, "Connection error"); self.state.set(LauncherState::Disconnected); // Wait before reconnecting sleep(Duration::from_secs(2)).await; } } } } async fn connect_and_run(&mut self) -> Result<()> { self.state.set(LauncherState::Connecting); info!(path = %self.socket_path.display(), "Connecting to daemon"); let mut client = IpcClient::connect(&self.socket_path) .await .context("Failed to connect to daemon")?; info!("Connected to daemon"); // Get initial state (includes entries) info!("Sending GetState command"); let response = client.send(Command::GetState).await?; info!("Got GetState response"); self.handle_response(response)?; // Note: ListEntries is not needed since GetState includes entries in the snapshot // Now consume client for event stream (this will send SubscribeEvents internally) info!("Subscribing to events"); let mut events = client.subscribe().await?; info!("Subscribed to events, entering event loop"); // Main event loop loop { tokio::select! { // Handle commands from UI Some(cmd) = self.command_rx.recv() => { match cmd { ClientCommand::Shutdown => { info!("Shutdown requested"); return Ok(()); } ClientCommand::Launch(entry_id) => { // We can't send commands after subscribing since client is consumed // Need to reconnect for commands warn!("Launch command received but cannot send after subscribe"); // For now, trigger a reconnect return Ok(()); } ClientCommand::StopCurrent => { warn!("Stop command received but cannot send after subscribe"); return Ok(()); } ClientCommand::RefreshState => { // Trigger reconnect to refresh return Ok(()); } } } // Handle events from daemon event_result = events.next() => { match event_result { Ok(event) => { info!(event = ?event, "Received event from daemon (client.rs)"); self.state.handle_event(event); } Err(e) => { error!(error = %e, "Event stream error"); return Err(e.into()); } } } } } } fn handle_response(&self, response: Response) -> Result<()> { match response.result { ResponseResult::Ok(payload) => { match payload { ResponsePayload::State(snapshot) => { if let Some(session) = snapshot.current_session { let now = chrono::Local::now(); let time_remaining = if session.deadline > now { (session.deadline - now).to_std().ok() } else { Some(Duration::ZERO) }; self.state.set(LauncherState::SessionActive { session_id: session.session_id, entry_label: session.label, time_remaining, }); } else { self.state.set(LauncherState::Idle { entries: snapshot.entries, }); } } ResponsePayload::Entries(entries) => { // Only update if we're in idle state if matches!(self.state.get(), LauncherState::Idle { .. } | LauncherState::Connecting) { self.state.set(LauncherState::Idle { entries }); } } ResponsePayload::LaunchApproved { session_id, deadline } => { let now = chrono::Local::now(); let time_remaining = if deadline > now { (deadline - now).to_std().ok() } else { Some(Duration::ZERO) }; self.state.set(LauncherState::SessionActive { session_id, entry_label: "Starting...".into(), time_remaining, }); } ResponsePayload::LaunchDenied { reasons } => { let message = reasons .iter() .map(|r| reason_to_message(r)) .collect::>() .join(", "); self.state.set(LauncherState::Error { message }); } _ => {} } Ok(()) } ResponseResult::Err(e) => { self.state.set(LauncherState::Error { message: e.message, }); Ok(()) } } } } /// Separate command client for sending commands (not subscribed) pub struct CommandClient { socket_path: std::path::PathBuf, } impl CommandClient { pub fn new(socket_path: impl AsRef) -> Self { Self { socket_path: socket_path.as_ref().to_path_buf(), } } pub async fn launch(&self, entry_id: &EntryId) -> Result { let mut client = IpcClient::connect(&self.socket_path).await?; client.send(Command::Launch { entry_id: entry_id.clone(), }).await.map_err(Into::into) } pub async fn stop_current(&self) -> Result { let mut client = IpcClient::connect(&self.socket_path).await?; client.send(Command::StopCurrent { mode: shepherd_api::StopMode::Graceful, }).await.map_err(Into::into) } pub async fn get_state(&self) -> Result { let mut client = IpcClient::connect(&self.socket_path).await?; client.send(Command::GetState).await.map_err(Into::into) } pub async fn list_entries(&self) -> Result { let mut client = IpcClient::connect(&self.socket_path).await?; client.send(Command::ListEntries { at_time: None }).await.map_err(Into::into) } } /// Convert a ReasonCode enum variant to a human-readable message fn reason_to_message(reason: &ReasonCode) -> &'static str { match reason { ReasonCode::OutsideTimeWindow { .. } => "Outside allowed time window", ReasonCode::QuotaExhausted { .. } => "Daily quota exhausted", ReasonCode::CooldownActive { .. } => "Cooldown period active", ReasonCode::SessionActive { .. } => "Another session is active", ReasonCode::UnsupportedKind { .. } => "Entry type not supported", ReasonCode::Disabled { .. } => "Entry disabled", } }