Fix child process monitoring

This commit is contained in:
Albert Armea 2025-12-27 11:28:15 -05:00
parent 2965afacae
commit 3b28bd7129
5 changed files with 123 additions and 23 deletions

View file

@ -66,7 +66,7 @@ impl LinuxHost {
}
for (pid, pgid, status) in exited {
debug!(pid = pid, status = ?status, "Process exited");
info!(pid = pid, pgid = pgid, status = ?status, "Process exited - sending HostEvent::Exited");
// We don't have the session_id here, so we use a placeholder
// The daemon should track the mapping

View file

@ -173,7 +173,74 @@ impl LauncherApp {
match client.launch(&entry_id).await {
Ok(response) => {
debug!(response = ?response, "Launch response");
// State will be updated via events
// Handle error responses from daemon
match response.result {
shepherd_api::ResponseResult::Ok(payload) => {
// Check what kind of success response we got
match payload {
shepherd_api::ResponsePayload::LaunchApproved { session_id, deadline } => {
info!(session_id = %session_id, "Launch approved, setting SessionActive");
let now = chrono::Local::now();
let time_remaining = if deadline > now {
(deadline - now).to_std().ok()
} else {
Some(std::time::Duration::ZERO)
};
state.set(LauncherState::SessionActive {
session_id,
entry_label: entry_id.to_string(),
time_remaining,
});
}
shepherd_api::ResponsePayload::LaunchDenied { reasons } => {
let message = reasons
.iter()
.map(|r| format!("{:?}", r))
.collect::<Vec<_>>()
.join(", ");
error!(message = %message, "Launch denied");
state.set(LauncherState::Error { message });
}
_ => {
// Other OK responses - events will update state
}
}
}
shepherd_api::ResponseResult::Err(err) => {
// Launch failed on server side - refresh state to recover
error!(error = %err.message, "Launch failed on server");
// Request fresh state from daemon to get back to correct state
match client.get_state().await {
Ok(state_resp) => {
if let shepherd_api::ResponseResult::Ok(
shepherd_api::ResponsePayload::State(snapshot)
) = state_resp.result {
if snapshot.current_session.is_some() {
// Session is still active somehow
debug!("Session still active after spawn failure");
} else {
// No session - return to idle with entries
state.set(LauncherState::Idle {
entries: snapshot.entries,
});
}
} else {
// Unexpected response, show error
state.set(LauncherState::Error {
message: format!("Launch failed: {}", err.message),
});
}
}
Err(e) => {
// Can't get state, show error
error!(error = %e, "Failed to get state after launch failure");
state.set(LauncherState::Error {
message: format!("Launch failed: {}", err.message),
});
}
}
}
}
}
Err(e) => {
error!(error = %e, "Launch failed");
@ -208,13 +275,17 @@ impl LauncherApp {
});
});
// Start daemon client in background
// Start daemon client in background thread (separate from GTK main loop)
// This ensures the tokio runtime is properly driven for event reception
let state_for_client = state.clone();
let socket_for_client = socket_path.clone();
runtime.spawn(async move {
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime for event loop");
rt.block_on(async move {
let client = DaemonClient::new(socket_for_client, state_for_client, command_rx);
client.run().await;
});
});
// Set up state change handler
let stack_weak = stack.downgrade();

View file

@ -75,22 +75,18 @@ impl DaemonClient {
info!("Connected to daemon");
// Get initial state
// Get initial state (includes entries)
info!("Sending GetState command");
let response = client.send(Command::GetState).await?;
info!("Got GetState response");
self.handle_response(response)?;
// Subscribe to events
let response = client.send(Command::SubscribeEvents).await?;
if let ResponseResult::Err(e) = response.result {
warn!(error = %e.message, "Failed to subscribe to events");
}
// Note: ListEntries is not needed since GetState includes entries in the snapshot
// Get entries list
let response = client.send(Command::ListEntries { at_time: None }).await?;
self.handle_response(response)?;
// Now consume client for event stream
// Now consume client for event stream (this will send SubscribeEvents internally)
info!("Subscribing to events");
let mut events = client.subscribe().await?;
info!("Subscribed to events, entering event loop");
// Main event loop
loop {
@ -124,7 +120,7 @@ impl DaemonClient {
event_result = events.next() => {
match event_result {
Ok(event) => {
debug!(event = ?event, "Received event");
info!(event = ?event, "Received event from daemon (client.rs)");
self.state.handle_event(event);
}
Err(e) => {

View file

@ -60,8 +60,10 @@ impl SharedState {
/// Update state from daemon event
pub fn handle_event(&self, event: Event) {
tracing::info!(event = ?event.payload, "Received event from daemon");
match event.payload {
EventPayload::StateChanged(snapshot) => {
tracing::info!(has_session = snapshot.current_session.is_some(), "Applying state snapshot");
self.apply_snapshot(snapshot);
}
EventPayload::SessionStarted {
@ -70,6 +72,7 @@ impl SharedState {
label,
deadline,
} => {
tracing::info!(session_id = %session_id, label = %label, "Session started event");
let now = chrono::Local::now();
let time_remaining = if deadline > now {
(deadline - now).to_std().ok()
@ -82,7 +85,8 @@ impl SharedState {
time_remaining,
});
}
EventPayload::SessionEnded { .. } => {
EventPayload::SessionEnded { session_id, entry_id, reason, .. } => {
tracing::info!(session_id = %session_id, entry_id = %entry_id, reason = ?reason, "Session ended event - setting Connecting");
// Will be followed by StateChanged, but set to connecting
// to ensure grid reloads
self.set(LauncherState::Connecting);

View file

@ -308,10 +308,10 @@ impl Daemon {
let now_mono = MonotonicInstant::now();
let now = Local::now();
debug!(
info!(
session_id = %handle.session_id,
status = ?status,
"Host process exited"
"Host process exited - will end session"
);
let core_event = {
@ -319,6 +319,8 @@ impl Daemon {
engine.notify_session_exited(status.code, now_mono, now)
};
info!(has_event = core_event.is_some(), "notify_session_exited result");
if let Some(event) = core_event {
if let CoreEvent::SessionEnded {
session_id,
@ -327,6 +329,13 @@ impl Daemon {
duration,
} = event
{
info!(
session_id = %session_id,
entry_id = %entry_id,
reason = ?reason,
duration_secs = duration.as_secs(),
"Broadcasting SessionEnded"
);
ipc.broadcast_event(Event::new(EventPayload::SessionEnded {
session_id,
entry_id,
@ -339,6 +348,7 @@ impl Daemon {
let engine = engine.lock().await;
engine.get_state()
};
info!("Broadcasting StateChanged");
ipc.broadcast_event(Event::new(EventPayload::StateChanged(state)));
}
}
@ -501,9 +511,28 @@ impl Daemon {
}
}
Err(e) => {
// Notify session ended with error
// Notify session ended with error and broadcast to subscribers
let mut eng = engine.lock().await;
eng.notify_session_exited(Some(-1), now_mono, now);
if let Some(event) = eng.notify_session_exited(Some(-1), now_mono, now) {
if let CoreEvent::SessionEnded {
session_id,
entry_id,
reason,
duration,
} = event
{
ipc.broadcast_event(Event::new(EventPayload::SessionEnded {
session_id,
entry_id,
reason,
duration,
}));
// Broadcast state change so clients return to idle
let state = eng.get_state();
ipc.broadcast_event(Event::new(EventPayload::StateChanged(state)));
}
}
Response::error(
request_id,