256 lines
9.5 KiB
Rust
256 lines
9.5 KiB
Rust
//! IPC client wrapper for the launcher UI
|
|
|
|
use anyhow::{Context, Result};
|
|
use shepherd_api::{Command, ReasonCode, Response, ResponsePayload, ResponseResult};
|
|
use shepherd_ipc::IpcClient;
|
|
use shepherd_util::EntryId;
|
|
use std::path::Path;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tokio::time::sleep;
|
|
use tracing::{error, info, warn};
|
|
|
|
use crate::state::{LauncherState, SharedState};
|
|
|
|
/// Messages from UI to client task
|
|
#[derive(Debug)]
|
|
#[allow(dead_code)]
|
|
pub enum ClientCommand {
|
|
/// Request to launch an entry
|
|
Launch(EntryId),
|
|
/// Request to stop current session
|
|
StopCurrent,
|
|
/// Request fresh state
|
|
RefreshState,
|
|
/// Shutdown the client
|
|
Shutdown,
|
|
}
|
|
|
|
/// Client connection manager
|
|
pub struct ServiceClient {
|
|
socket_path: std::path::PathBuf,
|
|
state: SharedState,
|
|
command_rx: mpsc::UnboundedReceiver<ClientCommand>,
|
|
}
|
|
|
|
impl ServiceClient {
|
|
pub fn new(
|
|
socket_path: impl AsRef<Path>,
|
|
state: SharedState,
|
|
command_rx: mpsc::UnboundedReceiver<ClientCommand>,
|
|
) -> Self {
|
|
Self {
|
|
socket_path: socket_path.as_ref().to_path_buf(),
|
|
state,
|
|
command_rx,
|
|
}
|
|
}
|
|
|
|
/// Run the client connection loop
|
|
pub async fn run(mut self) {
|
|
loop {
|
|
match self.connect_and_run().await {
|
|
Ok(()) => {
|
|
info!("Client loop exited normally");
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
error!(error = %e, "Connection error");
|
|
self.state.set(LauncherState::Disconnected);
|
|
|
|
// Wait before reconnecting
|
|
sleep(Duration::from_secs(2)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn connect_and_run(&mut self) -> Result<()> {
|
|
self.state.set(LauncherState::Connecting);
|
|
|
|
info!(path = %self.socket_path.display(), "Connecting to shepherdd");
|
|
|
|
let mut client = IpcClient::connect(&self.socket_path)
|
|
.await
|
|
.context("Failed to connect to shepherdd")?;
|
|
|
|
info!("Connected to shepherdd");
|
|
|
|
// Get initial state (includes entries)
|
|
info!("Sending GetState command");
|
|
let response = client.send(Command::GetState).await?;
|
|
info!("Got GetState response");
|
|
self.handle_response(response)?;
|
|
|
|
// Note: ListEntries is not needed since GetState includes entries in the snapshot
|
|
|
|
// Now consume client for event stream (this will send SubscribeEvents internally)
|
|
info!("Subscribing to events");
|
|
let mut events = client.subscribe().await?;
|
|
info!("Subscribed to events, entering event loop");
|
|
|
|
// Main event loop
|
|
loop {
|
|
tokio::select! {
|
|
// Handle commands from UI
|
|
Some(cmd) = self.command_rx.recv() => {
|
|
match cmd {
|
|
ClientCommand::Shutdown => {
|
|
info!("Shutdown requested");
|
|
return Ok(());
|
|
}
|
|
ClientCommand::Launch(_entry_id) => {
|
|
// We can't send commands after subscribing since client is consumed
|
|
// Need to reconnect for commands
|
|
warn!("Launch command received but cannot send after subscribe");
|
|
// For now, trigger a reconnect
|
|
return Ok(());
|
|
}
|
|
ClientCommand::StopCurrent => {
|
|
warn!("Stop command received but cannot send after subscribe");
|
|
return Ok(());
|
|
}
|
|
ClientCommand::RefreshState => {
|
|
// Trigger reconnect to refresh
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle events from shepherdd
|
|
event_result = events.next() => {
|
|
match event_result {
|
|
Ok(event) => {
|
|
info!(event = ?event, "Received event from shepherdd (client.rs)");
|
|
self.state.handle_event(event);
|
|
}
|
|
Err(e) => {
|
|
error!(error = %e, "Event stream error");
|
|
return Err(e.into());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn handle_response(&self, response: Response) -> Result<()> {
|
|
match response.result {
|
|
ResponseResult::Ok(payload) => {
|
|
match payload {
|
|
ResponsePayload::State(snapshot) => {
|
|
if let Some(session) = snapshot.current_session {
|
|
let now = shepherd_util::now();
|
|
// For unlimited sessions (deadline=None), time_remaining is None
|
|
let time_remaining = session.deadline.and_then(|d| {
|
|
if d > now {
|
|
(d - now).to_std().ok()
|
|
} else {
|
|
Some(Duration::ZERO)
|
|
}
|
|
});
|
|
self.state.set(LauncherState::SessionActive {
|
|
session_id: session.session_id,
|
|
entry_label: session.label,
|
|
time_remaining,
|
|
});
|
|
} else {
|
|
self.state.set(LauncherState::Idle {
|
|
entries: snapshot.entries,
|
|
});
|
|
}
|
|
}
|
|
ResponsePayload::Entries(entries) => {
|
|
// Only update if we're in idle state
|
|
if matches!(self.state.get(), LauncherState::Idle { .. } | LauncherState::Connecting) {
|
|
self.state.set(LauncherState::Idle { entries });
|
|
}
|
|
}
|
|
ResponsePayload::LaunchApproved { session_id, deadline } => {
|
|
let now = shepherd_util::now();
|
|
// For unlimited sessions (deadline=None), time_remaining is None
|
|
let time_remaining = deadline.and_then(|d| {
|
|
if d > now {
|
|
(d - now).to_std().ok()
|
|
} else {
|
|
Some(Duration::ZERO)
|
|
}
|
|
});
|
|
self.state.set(LauncherState::SessionActive {
|
|
session_id,
|
|
entry_label: "Starting...".into(),
|
|
time_remaining,
|
|
});
|
|
}
|
|
ResponsePayload::LaunchDenied { reasons } => {
|
|
let message = reasons
|
|
.iter()
|
|
.map(|r| reason_to_message(r))
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
self.state.set(LauncherState::Error { message });
|
|
}
|
|
_ => {}
|
|
}
|
|
Ok(())
|
|
}
|
|
ResponseResult::Err(e) => {
|
|
self.state.set(LauncherState::Error {
|
|
message: e.message,
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Separate command client for sending commands (not subscribed)
|
|
pub struct CommandClient {
|
|
socket_path: std::path::PathBuf,
|
|
}
|
|
|
|
impl CommandClient {
|
|
pub fn new(socket_path: impl AsRef<Path>) -> Self {
|
|
Self {
|
|
socket_path: socket_path.as_ref().to_path_buf(),
|
|
}
|
|
}
|
|
|
|
pub async fn launch(&self, entry_id: &EntryId) -> Result<Response> {
|
|
let mut client = IpcClient::connect(&self.socket_path).await?;
|
|
client.send(Command::Launch {
|
|
entry_id: entry_id.clone(),
|
|
}).await.map_err(Into::into)
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub async fn stop_current(&self) -> Result<Response> {
|
|
let mut client = IpcClient::connect(&self.socket_path).await?;
|
|
client.send(Command::StopCurrent {
|
|
mode: shepherd_api::StopMode::Graceful,
|
|
}).await.map_err(Into::into)
|
|
}
|
|
|
|
pub async fn get_state(&self) -> Result<Response> {
|
|
let mut client = IpcClient::connect(&self.socket_path).await?;
|
|
client.send(Command::GetState).await.map_err(Into::into)
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub async fn list_entries(&self) -> Result<Response> {
|
|
let mut client = IpcClient::connect(&self.socket_path).await?;
|
|
client.send(Command::ListEntries { at_time: None }).await.map_err(Into::into)
|
|
}
|
|
}
|
|
|
|
/// Convert a ReasonCode enum variant to a human-readable message
|
|
fn reason_to_message(reason: &ReasonCode) -> &'static str {
|
|
match reason {
|
|
ReasonCode::OutsideTimeWindow { .. } => "Outside allowed time window",
|
|
ReasonCode::QuotaExhausted { .. } => "Daily quota exhausted",
|
|
ReasonCode::CooldownActive { .. } => "Cooldown period active",
|
|
ReasonCode::SessionActive { .. } => "Another session is active",
|
|
ReasonCode::UnsupportedKind { .. } => "Entry type not supported",
|
|
ReasonCode::Disabled { .. } => "Entry disabled",
|
|
}
|
|
}
|