feat(web): 改为通过 WebSocket 推送 ttyd 状态并清理轮询与冗余接口

This commit is contained in:
mofeng-git
2026-03-27 10:49:04 +08:00
parent e20136a5ab
commit 6bcb54bd22
15 changed files with 119 additions and 84 deletions

View File

@@ -7,7 +7,7 @@ pub mod types;
pub use types::{ pub use types::{
AtxDeviceInfo, AudioDeviceInfo, ClientStats, HidDeviceInfo, MsdDeviceInfo, SystemEvent, AtxDeviceInfo, AudioDeviceInfo, ClientStats, HidDeviceInfo, MsdDeviceInfo, SystemEvent,
VideoDeviceInfo, TtydDeviceInfo, VideoDeviceInfo,
}; };
use tokio::sync::broadcast; use tokio::sync::broadcast;

View File

@@ -100,6 +100,15 @@ pub struct AudioDeviceInfo {
pub error: Option<String>, pub error: Option<String>,
} }
/// ttyd status information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TtydDeviceInfo {
/// Whether ttyd binary is available
pub available: bool,
/// Whether ttyd is currently running
pub running: bool,
}
/// Per-client statistics /// Per-client statistics
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientStats { pub struct ClientStats {
@@ -325,6 +334,8 @@ pub enum SystemEvent {
atx: Option<AtxDeviceInfo>, atx: Option<AtxDeviceInfo>,
/// Audio device information (None if audio not enabled) /// Audio device information (None if audio not enabled)
audio: Option<AudioDeviceInfo>, audio: Option<AudioDeviceInfo>,
/// ttyd status information
ttyd: TtydDeviceInfo,
}, },
/// WebSocket error notification (for connection-level errors like lag) /// WebSocket error notification (for connection-level errors like lag)

View File

@@ -10,6 +10,7 @@ use tokio::process::{Child, Command};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use super::types::*; use super::types::*;
use crate::events::EventBus;
/// Maximum number of log lines to keep per extension /// Maximum number of log lines to keep per extension
const LOG_BUFFER_SIZE: usize = 200; const LOG_BUFFER_SIZE: usize = 200;
@@ -31,6 +32,7 @@ pub struct ExtensionManager {
processes: RwLock<HashMap<ExtensionId, ExtensionProcess>>, processes: RwLock<HashMap<ExtensionId, ExtensionProcess>>,
/// Cached availability status (checked once at startup) /// Cached availability status (checked once at startup)
availability: HashMap<ExtensionId, bool>, availability: HashMap<ExtensionId, bool>,
event_bus: RwLock<Option<Arc<EventBus>>>,
} }
impl Default for ExtensionManager { impl Default for ExtensionManager {
@@ -51,6 +53,22 @@ impl ExtensionManager {
Self { Self {
processes: RwLock::new(HashMap::new()), processes: RwLock::new(HashMap::new()),
availability, availability,
event_bus: RwLock::new(None),
}
}
/// Set event bus for ttyd status notifications.
pub async fn set_event_bus(&self, event_bus: Arc<EventBus>) {
*self.event_bus.write().await = Some(event_bus);
}
async fn mark_ttyd_status_dirty(&self, id: ExtensionId) {
if id != ExtensionId::Ttyd {
return;
}
if let Some(ref event_bus) = *self.event_bus.read().await {
event_bus.mark_device_info_dirty();
} }
} }
@@ -65,17 +83,38 @@ impl ExtensionManager {
return ExtensionStatus::Unavailable; return ExtensionStatus::Unavailable;
} }
let processes = self.processes.read().await; let mut processes = self.processes.write().await;
match processes.get(&id) { let exited = {
Some(proc) => { let Some(proc) = processes.get_mut(&id) else {
if let Some(pid) = proc.child.id() { return ExtensionStatus::Stopped;
ExtensionStatus::Running { pid } };
} else {
ExtensionStatus::Stopped match proc.child.try_wait() {
} Ok(Some(status)) => {
tracing::info!("Extension {} exited with status {}", id, status);
true
} }
Ok(None) => {
return match proc.child.id() {
Some(pid) => ExtensionStatus::Running { pid },
None => ExtensionStatus::Stopped, None => ExtensionStatus::Stopped,
};
} }
Err(e) => {
tracing::warn!("Failed to query status for {}: {}", id, e);
return match proc.child.id() {
Some(pid) => ExtensionStatus::Running { pid },
None => ExtensionStatus::Stopped,
};
}
}
};
if exited {
processes.remove(&id);
}
ExtensionStatus::Stopped
} }
/// Start an extension with the given configuration /// Start an extension with the given configuration
@@ -134,6 +173,8 @@ impl ExtensionManager {
let mut processes = self.processes.write().await; let mut processes = self.processes.write().await;
processes.insert(id, ExtensionProcess { child, logs }); processes.insert(id, ExtensionProcess { child, logs });
drop(processes);
self.mark_ttyd_status_dirty(id).await;
Ok(()) Ok(())
} }
@@ -146,6 +187,8 @@ impl ExtensionManager {
if let Err(e) = proc.child.kill().await { if let Err(e) = proc.child.kill().await {
tracing::warn!("Failed to kill {}: {}", id, e); tracing::warn!("Failed to kill {}: {}", id, e);
} }
drop(processes);
self.mark_ttyd_status_dirty(id).await;
} }
Ok(()) Ok(())
} }

View File

@@ -1371,8 +1371,7 @@ mod tests {
// Test keyboard packet (8 bytes data) // Test keyboard packet (8 bytes data)
let data = [0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]; // 'A' key let data = [0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]; // 'A' key
let packet = let packet = Ch9329Backend::build_packet(DEFAULT_ADDR, cmd::SEND_KB_GENERAL_DATA, &data);
Ch9329Backend::build_packet(DEFAULT_ADDR, cmd::SEND_KB_GENERAL_DATA, &data);
assert_eq!(packet[0], 0x57); // Header assert_eq!(packet[0], 0x57); // Header
assert_eq!(packet[1], 0xAB); // Header assert_eq!(packet[1], 0xAB); // Header

View File

@@ -199,7 +199,12 @@ pub fn encode_keyboard_event(event: &KeyboardEvent) -> Vec<u8> {
let modifiers = event.modifiers.to_hid_byte(); let modifiers = event.modifiers.to_hid_byte();
vec![MSG_KEYBOARD, event_type, event.key.to_hid_usage(), modifiers] vec![
MSG_KEYBOARD,
event_type,
event.key.to_hid_usage(),
modifiers,
]
} }
/// Encode a mouse event to binary format (for sending to client if needed) /// Encode a mouse event to binary format (for sending to client if needed)

View File

@@ -576,6 +576,8 @@ async fn main() -> anyhow::Result<()> {
data_dir.clone(), data_dir.clone(),
); );
extensions.set_event_bus(events.clone()).await;
// Start RustDesk service if enabled // Start RustDesk service if enabled
if let Some(ref service) = rustdesk { if let Some(ref service) = rustdesk {
if let Err(e) = service.start().await { if let Err(e) = service.start().await {

View File

@@ -7,9 +7,9 @@ use crate::auth::{SessionStore, UserStore};
use crate::config::ConfigStore; use crate::config::ConfigStore;
use crate::events::{ use crate::events::{
AtxDeviceInfo, AudioDeviceInfo, EventBus, HidDeviceInfo, MsdDeviceInfo, SystemEvent, AtxDeviceInfo, AudioDeviceInfo, EventBus, HidDeviceInfo, MsdDeviceInfo, SystemEvent,
VideoDeviceInfo, TtydDeviceInfo, VideoDeviceInfo,
}; };
use crate::extensions::ExtensionManager; use crate::extensions::{ExtensionId, ExtensionManager};
use crate::hid::HidController; use crate::hid::HidController;
use crate::msd::MsdController; use crate::msd::MsdController;
use crate::otg::OtgService; use crate::otg::OtgService;
@@ -157,12 +157,13 @@ impl AppState {
/// Uses tokio::join! to collect all device info in parallel for better performance. /// Uses tokio::join! to collect all device info in parallel for better performance.
pub async fn get_device_info(&self) -> SystemEvent { pub async fn get_device_info(&self) -> SystemEvent {
// Collect all device info in parallel // Collect all device info in parallel
let (video, hid, msd, atx, audio) = tokio::join!( let (video, hid, msd, atx, audio, ttyd) = tokio::join!(
self.collect_video_info(), self.collect_video_info(),
self.collect_hid_info(), self.collect_hid_info(),
self.collect_msd_info(), self.collect_msd_info(),
self.collect_atx_info(), self.collect_atx_info(),
self.collect_audio_info(), self.collect_audio_info(),
self.collect_ttyd_info(),
); );
SystemEvent::DeviceInfo { SystemEvent::DeviceInfo {
@@ -171,6 +172,7 @@ impl AppState {
msd, msd,
atx, atx,
audio, audio,
ttyd,
} }
} }
@@ -262,4 +264,14 @@ impl AppState {
error: status.error, error: status.error,
}) })
} }
/// Collect ttyd status information
async fn collect_ttyd_info(&self) -> TtydDeviceInfo {
let status = self.extensions.status(ExtensionId::Ttyd).await;
TtydDeviceInfo {
available: self.extensions.check_available(ExtensionId::Ttyd),
running: status.is_running(),
}
}
} }

View File

@@ -196,7 +196,10 @@ fn log_encoding_error(
if throttler.should_log(&key) { if throttler.should_log(&key) {
let suppressed = suppressed_errors.remove(&key).unwrap_or(0); let suppressed = suppressed_errors.remove(&key).unwrap_or(0);
if suppressed > 0 { if suppressed > 0 {
error!("Encoding failed: {} (suppressed {} repeats)", err, suppressed); error!(
"Encoding failed: {} (suppressed {} repeats)",
err, suppressed
);
} else { } else {
error!("Encoding failed: {}", err); error!("Encoding failed: {}", err);
} }

View File

@@ -159,7 +159,9 @@ impl MjpegDecoderKind {
} }
} }
pub(super) fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result<EncoderThreadState> { pub(super) fn build_encoder_state(
config: &SharedVideoPipelineConfig,
) -> Result<EncoderThreadState> {
let registry = EncoderRegistry::global(); let registry = EncoderRegistry::global();
let get_codec_name = let get_codec_name =
@@ -408,8 +410,10 @@ pub(super) fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result<
backend, codec_name backend, codec_name
); );
} }
let encoder = let encoder = VP8Encoder::with_codec(
VP8Encoder::with_codec(VP8Config::low_latency(config.resolution, config.bitrate_kbps()), &codec_name)?; VP8Config::low_latency(config.resolution, config.bitrate_kbps()),
&codec_name,
)?;
info!("Created VP8 encoder: {}", encoder.codec_name()); info!("Created VP8 encoder: {}", encoder.codec_name());
Box::new(VP8EncoderWrapper(encoder)) Box::new(VP8EncoderWrapper(encoder))
} }
@@ -421,8 +425,10 @@ pub(super) fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result<
backend, codec_name backend, codec_name
); );
} }
let encoder = let encoder = VP9Encoder::with_codec(
VP9Encoder::with_codec(VP9Config::low_latency(config.resolution, config.bitrate_kbps()), &codec_name)?; VP9Config::low_latency(config.resolution, config.bitrate_kbps()),
&codec_name,
)?;
info!("Created VP9 encoder: {}", encoder.codec_name()); info!("Created VP9 encoder: {}", encoder.codec_name());
Box::new(VP9EncoderWrapper(encoder)) Box::new(VP9EncoderWrapper(encoder))
} }
@@ -505,7 +511,10 @@ pub(super) fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result<
}) })
} }
fn h264_direct_input_format(codec_name: &str, input_format: PixelFormat) -> Option<H264InputFormat> { fn h264_direct_input_format(
codec_name: &str,
input_format: PixelFormat,
) -> Option<H264InputFormat> {
if codec_name.contains("rkmpp") { if codec_name.contains("rkmpp") {
match input_format { match input_format {
PixelFormat::Yuyv => Some(H264InputFormat::Yuyv422), PixelFormat::Yuyv => Some(H264InputFormat::Yuyv422),
@@ -531,7 +540,10 @@ fn h264_direct_input_format(codec_name: &str, input_format: PixelFormat) -> Opti
} }
} }
fn h265_direct_input_format(codec_name: &str, input_format: PixelFormat) -> Option<H265InputFormat> { fn h265_direct_input_format(
codec_name: &str,
input_format: PixelFormat,
) -> Option<H265InputFormat> {
if codec_name.contains("rkmpp") { if codec_name.contains("rkmpp") {
match input_format { match input_format {
PixelFormat::Yuyv => Some(H265InputFormat::Yuyv422), PixelFormat::Yuyv => Some(H265InputFormat::Yuyv422),

View File

@@ -4,7 +4,7 @@ use axum::{
extract::{Path, Query, State}, extract::{Path, Query, State},
Json, Json,
}; };
use serde::{Deserialize, Serialize}; use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use typeshare::typeshare; use typeshare::typeshare;
@@ -324,27 +324,3 @@ pub async fn update_easytier_config(
Ok(Json(new_config.extensions.easytier.clone())) Ok(Json(new_config.extensions.easytier.clone()))
} }
// ============================================================================
// Ttyd status for console (simplified)
// ============================================================================
/// Simple ttyd status for console view
#[typeshare]
#[derive(Debug, Serialize)]
pub struct TtydStatus {
pub available: bool,
pub running: bool,
}
/// Get ttyd status for console view
/// GET /api/extensions/ttyd/status
pub async fn get_ttyd_status(State(state): State<Arc<AppState>>) -> Json<TtydStatus> {
let mgr = &state.extensions;
let status = mgr.status(ExtensionId::Ttyd).await;
Json(TtydStatus {
available: mgr.check_available(ExtensionId::Ttyd),
running: status.is_running(),
})
}

View File

@@ -196,10 +196,6 @@ pub fn create_router(state: Arc<AppState>) -> Router {
"/extensions/ttyd/config", "/extensions/ttyd/config",
patch(handlers::extensions::update_ttyd_config), patch(handlers::extensions::update_ttyd_config),
) )
.route(
"/extensions/ttyd/status",
get(handlers::extensions::get_ttyd_status),
)
.route( .route(
"/extensions/gostc/config", "/extensions/gostc/config",
patch(handlers::extensions::update_gostc_config), patch(handlers::extensions::update_gostc_config),

View File

@@ -30,7 +30,6 @@ import type {
GostcConfigUpdate, GostcConfigUpdate,
EasytierConfig, EasytierConfig,
EasytierConfigUpdate, EasytierConfigUpdate,
TtydStatus,
} from '@/types/generated' } from '@/types/generated'
import { request } from './request' import { request } from './request'
@@ -236,11 +235,6 @@ export const extensionsApi = {
logs: (id: string, lines = 100) => logs: (id: string, lines = 100) =>
request<ExtensionLogs>(`/extensions/${id}/logs?lines=${lines}`), request<ExtensionLogs>(`/extensions/${id}/logs?lines=${lines}`),
/**
* 获取 ttyd 状态(简化版,用于控制台)
*/
getTtydStatus: () => request<TtydStatus>('/extensions/ttyd/status'),
/** /**
* 更新 ttyd 配置 * 更新 ttyd 配置
*/ */

View File

@@ -118,12 +118,18 @@ export interface AudioDeviceInfo {
error: string | null error: string | null
} }
export interface TtydDeviceInfo {
available: boolean
running: boolean
}
export interface DeviceInfoEvent { export interface DeviceInfoEvent {
video: VideoDeviceInfo video: VideoDeviceInfo
hid: HidDeviceInfo hid: HidDeviceInfo
msd: MsdDeviceInfo | null msd: MsdDeviceInfo | null
atx: AtxDeviceInfo | null atx: AtxDeviceInfo | null
audio: AudioDeviceInfo | null audio: AudioDeviceInfo | null
ttyd: TtydDeviceInfo
} }
export const useSystemStore = defineStore('system', () => { export const useSystemStore = defineStore('system', () => {

View File

@@ -667,12 +667,6 @@ export interface TtydConfigUpdate {
shell?: string; shell?: string;
} }
/** Simple ttyd status for console view */
export interface TtydStatus {
available: boolean;
running: boolean;
}
export interface VideoConfigUpdate { export interface VideoConfigUpdate {
device?: string; device?: string;
format?: string; format?: string;

View File

@@ -11,7 +11,7 @@ import { useHidWebSocket } from '@/composables/useHidWebSocket'
import { useWebRTC } from '@/composables/useWebRTC' import { useWebRTC } from '@/composables/useWebRTC'
import { useVideoSession } from '@/composables/useVideoSession' import { useVideoSession } from '@/composables/useVideoSession'
import { getUnifiedAudio } from '@/composables/useUnifiedAudio' import { getUnifiedAudio } from '@/composables/useUnifiedAudio'
import { streamApi, hidApi, atxApi, extensionsApi, atxConfigApi, authApi } from '@/api' import { streamApi, hidApi, atxApi, atxConfigApi, authApi } from '@/api'
import { CanonicalKey } from '@/types/generated' import { CanonicalKey } from '@/types/generated'
import type { HidKeyboardEvent, HidMouseEvent } from '@/types/hid' import type { HidKeyboardEvent, HidMouseEvent } from '@/types/hid'
import { keyboardEventToCanonicalKey, updateModifierMaskForKey } from '@/lib/keyboardMappings' import { keyboardEventToCanonicalKey, updateModifierMaskForKey } from '@/lib/keyboardMappings'
@@ -162,7 +162,6 @@ const changingPassword = ref(false)
// ttyd (web terminal) state // ttyd (web terminal) state
const ttydStatus = ref<{ available: boolean; running: boolean } | null>(null) const ttydStatus = ref<{ available: boolean; running: boolean } | null>(null)
const showTerminalDialog = ref(false) const showTerminalDialog = ref(false)
let ttydPollInterval: ReturnType<typeof setInterval> | null = null
// Theme // Theme
const isDark = ref(document.documentElement.classList.contains('dark')) const isDark = ref(document.documentElement.classList.contains('dark'))
@@ -965,6 +964,7 @@ function handleDeviceInfo(data: any) {
const prevAudioStreaming = systemStore.audio?.streaming ?? false const prevAudioStreaming = systemStore.audio?.streaming ?? false
const prevAudioDevice = systemStore.audio?.device ?? null const prevAudioDevice = systemStore.audio?.device ?? null
systemStore.updateFromDeviceInfo(data) systemStore.updateFromDeviceInfo(data)
ttydStatus.value = data.ttyd ?? null
const nextAudioStreaming = systemStore.audio?.streaming ?? false const nextAudioStreaming = systemStore.audio?.streaming ?? false
const nextAudioDevice = systemStore.audio?.device ?? null const nextAudioDevice = systemStore.audio?.device ?? null
@@ -1484,14 +1484,6 @@ async function handleChangePassword() {
} }
// ttyd (web terminal) functions // ttyd (web terminal) functions
async function fetchTtydStatus() {
try {
ttydStatus.value = await extensionsApi.getTtydStatus()
} catch {
ttydStatus.value = null
}
}
function openTerminal() { function openTerminal() {
if (!ttydStatus.value?.running) return if (!ttydStatus.value?.running) return
showTerminalDialog.value = true showTerminalDialog.value = true
@@ -2112,10 +2104,6 @@ onMounted(async () => {
document.documentElement.classList.add('dark') document.documentElement.classList.add('dark')
} }
// Fetch ttyd status initially and poll every 10 seconds
fetchTtydStatus()
ttydPollInterval = setInterval(fetchTtydStatus, 10000)
// Note: Video mode is now synced from server via device_info event // Note: Video mode is now synced from server via device_info event
// The handleDeviceInfo function will automatically switch to the server's mode // The handleDeviceInfo function will automatically switch to the server's mode
// localStorage preference is only used when server mode matches // localStorage preference is only used when server mode matches
@@ -2142,12 +2130,6 @@ onUnmounted(() => {
mouseFlushTimer = null mouseFlushTimer = null
} }
// Clear ttyd poll interval
if (ttydPollInterval) {
clearInterval(ttydPollInterval)
ttydPollInterval = null
}
// Clear all timers // Clear all timers
if (retryTimeoutId !== null) { if (retryTimeoutId !== null) {
clearTimeout(retryTimeoutId) clearTimeout(retryTimeoutId)