diff --git a/kvmd/aiogp.py b/kvmd/aiogp.py index 3ee0ea2b..d6fd4a5f 100644 --- a/kvmd/aiogp.py +++ b/kvmd/aiogp.py @@ -24,7 +24,7 @@ import asyncio import threading import dataclasses -import gpiod +#import gpiod from . import aiotools @@ -79,46 +79,8 @@ class AioReader: # pylint: disable=too-many-instance-attributes assert self.__loop pins = sorted(self.__pins) - with gpiod.request_lines( - self.__path, - consumer=self.__consumer, - config={tuple(pins): gpiod.LineSettings(edge_detection=gpiod.line.Edge.BOTH)}, - ) as line_req: + - line_req.wait_edge_events(0.1) - self.__values = { - pin: _DebouncedValue( - initial=bool(value.value), - debounce=self.__pins[pin].debounce, - notifier=self.__notifier, - loop=self.__loop, - ) - for (pin, value) in zip(pins, line_req.get_values(pins)) - } - self.__loop.call_soon_threadsafe(self.__notifier.notify) - - while not self.__stop_event.is_set(): - if line_req.wait_edge_events(1): - new: dict[int, bool] = {} - for event in line_req.read_edge_events(): - (pin, value) = self.__parse_event(event) - new[pin] = value - for (pin, value) in new.items(): - self.__values[pin].set(value) - else: # Timeout - # XXX: Лимит был актуален для 1.6. Надо проверить, поменялось ли это в 2.x. - # Размер буфера ядра - 16 эвентов на линии. При превышении этого числа, - # новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML. - # Штош. Будем с этим жить и синхронизировать состояния при таймауте. - for (pin, value) in zip(pins, line_req.get_values(pins)): - self.__values[pin].set(bool(value.value)) # type: ignore - - def __parse_event(self, event: gpiod.EdgeEvent) -> tuple[int, bool]: - if event.event_type == event.Type.RISING_EDGE: - return (event.line_offset, True) - elif event.event_type == event.Type.FALLING_EDGE: - return (event.line_offset, False) - raise RuntimeError(f"Invalid event {event} type: {event.type}") class _DebouncedValue: diff --git a/kvmd/aioproc.py b/kvmd/aioproc.py index 376df004..66c1d43f 100644 --- a/kvmd/aioproc.py +++ b/kvmd/aioproc.py @@ -21,6 +21,7 @@ import os +import platform import signal import asyncio import asyncio.subprocess @@ -38,11 +39,16 @@ async def run_process( env: (dict[str, str] | None)=None, ) -> asyncio.subprocess.Process: # pylint: disable=no-member + if platform.system() != 'Windows': + preexec_fn=os.setpgrp + else: + preexec_fn=None # 或者选择适合 Windows 的其他方式 + return (await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=(asyncio.subprocess.DEVNULL if err_to_null else asyncio.subprocess.STDOUT), - preexec_fn=os.setpgrp, + preexec_fn=preexec_fn, env=env, )) @@ -117,6 +123,6 @@ def rename_process(suffix: str, prefix: str="kvmd") -> None: def settle(name: str, suffix: str, prefix: str="kvmd") -> logging.Logger: logger = get_logger(1) logger.info("Started %s pid=%d", name, os.getpid()) - os.setpgrp() + #os.setpgrp() rename_process(suffix, prefix) return logger diff --git a/kvmd/aiotools.py b/kvmd/aiotools.py index a47c94c6..f2776770 100644 --- a/kvmd/aiotools.py +++ b/kvmd/aiotools.py @@ -27,6 +27,7 @@ import ssl import functools import types import typing +import platform from typing import Callable from typing import Awaitable @@ -56,8 +57,9 @@ def run(coro: Coroutine, final: (Coroutine | None)=None) -> None: raise SystemExit() loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, sigint_handler) - loop.add_signal_handler(signal.SIGTERM, sigterm_handler) + if platform.system() != 'Windows': + loop.add_signal_handler(signal.SIGINT, sigint_handler) + loop.add_signal_handler(signal.SIGTERM, sigterm_handler) main_task = loop.create_task(coro) try: diff --git a/kvmd/apps/__init__.py b/kvmd/apps/__init__.py index d9a2d97a..280f6125 100644 --- a/kvmd/apps/__init__.py +++ b/kvmd/apps/__init__.py @@ -181,12 +181,13 @@ def _init_config(config_path: str, override_options: list[str], **load_flags: bo _patch_raw(raw_config) config = make_config(raw_config, scheme) - if _patch_dynamic(raw_config, config, scheme, **load_flags): - config = make_config(raw_config, scheme) - - return config + except (ConfigError, UnknownPluginError) as ex: raise SystemExit(f"ConfigError: {ex}") + if _patch_dynamic(raw_config, config, scheme, **load_flags): + config = make_config(raw_config, scheme) + + return config def _patch_raw(raw_config: dict) -> None: # pylint: disable=too-many-branches diff --git a/kvmd/apps/kvmd/info/extras.py b/kvmd/apps/kvmd/info/extras.py index 1e69748c..6cae6574 100644 --- a/kvmd/apps/kvmd/info/extras.py +++ b/kvmd/apps/kvmd/info/extras.py @@ -23,6 +23,7 @@ import os import re import asyncio +import sys from typing import AsyncGenerator @@ -51,7 +52,7 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager): sui = sysunit.SystemdUnitInfo() await sui.open() except Exception as ex: - if not os.path.exists("/etc/kvmd/.docker_flag"): + if not os.path.exists("/etc/kvmd/.docker_flag") or not sys.platform.startswith('linux'): get_logger(0).error("Can't open systemd bus to get extras state: %s", tools.efmt(ex)) sui = None try: diff --git a/kvmd/apps/kvmd/info/hw.py b/kvmd/apps/kvmd/info/hw.py index 3c444760..db91e586 100644 --- a/kvmd/apps/kvmd/info/hw.py +++ b/kvmd/apps/kvmd/info/hw.py @@ -169,7 +169,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): + (st.steal + st.guest) / total * 100 ) except Exception as ex: - get_logger(0).error("Can't get CPU percent: %s", ex) + #get_logger(0).error("Can't get CPU percent: %s", ex) return None async def __get_mem(self) -> dict: @@ -218,7 +218,7 @@ class HwInfoSubmanager(BaseInfoSubmanager): async def __parse_vcgencmd(self, arg: str, parser: Callable[[str], _RetvalT]) -> (_RetvalT | None): cmd = [*self.__vcgencmd_cmd, arg] try: - text = (await aioproc.read_process(cmd, err_to_null=True))[1] + text = "throttled=0x0" except Exception: get_logger(0).exception("Error while executing: %s", tools.cmdfmt(cmd)) return None diff --git a/kvmd/apps/kvmd/info/system.py b/kvmd/apps/kvmd/info/system.py index d4a450de..de21b824 100644 --- a/kvmd/apps/kvmd/info/system.py +++ b/kvmd/apps/kvmd/info/system.py @@ -76,12 +76,14 @@ class SystemInfoSubmanager(BaseInfoSubmanager): except Exception: get_logger(0).exception("Can't get streamer info") else: - try: - for line in features_text.split("\n"): - (status, name) = map(str.strip, line.split(" ")) - features[name] = (status == "+") - except Exception: - get_logger(0).exception("Can't parse streamer features") + #try: + # print(features_text) + # for line in features_text.split("\n"): + # (status, name) = map(str.strip, line.split(" ")) + # features[name] = (status == "+") + #except Exception: + # get_logger(0).exception("Can't parse streamer features") + pass return { "app": os.path.basename(path), "version": version, diff --git a/kvmd/apps/kvmd/ocr.py b/kvmd/apps/kvmd/ocr.py index 367c0c80..13710d0c 100644 --- a/kvmd/apps/kvmd/ocr.py +++ b/kvmd/apps/kvmd/ocr.py @@ -78,7 +78,7 @@ def _load_libtesseract() -> (ctypes.CDLL | None): setattr(func, "argtypes", argtypes) return lib except Exception as ex: - warnings.warn(f"Can't load libtesseract: {ex}", RuntimeWarning) + #warnings.warn(f"Can't load libtesseract: {ex}", RuntimeWarning) return None diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 2efc4e1d..40427114 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -38,8 +38,10 @@ from aiohttp.web import StreamResponse import os from aiohttp import ClientConnectionError +from aiohttp import ClientPayloadError from aiohttp import ClientSession from aiohttp import UnixConnector +from aiohttp import ClientTimeout from urllib.parse import urlencode @@ -329,9 +331,11 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins socket_path = self.__streamer.get_path() query_string = urlencode(request.query) headers = request.headers.copy() + time_out = ClientTimeout(total=10) try: - async with ClientSession(connector=UnixConnector(path=socket_path)) as session: - backend_url = f'http://localhost/stream?{query_string}' if query_string else 'http://localhost/stream' + #async with ClientSession(connector=UnixConnector(path=socket_path)) as session: + async with ClientSession() as session: + backend_url = f'http://localhost:8000/stream?{query_string}' if query_string else 'http://localhost:8000/stream' async with session.get(backend_url, headers=headers) as resp: response = StreamResponse(status=resp.status, reason=resp.reason, headers=resp.headers) await response.prepare(request) @@ -341,7 +345,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins break await response.write(chunk) return response - except ClientConnectionError: + except (ClientConnectionError, ClientPayloadError, ConnectionResetError): return Response(status=500, text="Client connection was closed") diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py index 3422a9bf..0f117e4a 100644 --- a/kvmd/apps/kvmd/streamer.py +++ b/kvmd/apps/kvmd/streamer.py @@ -20,6 +20,7 @@ # ========================================================================== # +import platform import signal import asyncio import asyncio.subprocess @@ -302,7 +303,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__notifier.notify(self.__ST_STREAMER) get_logger(0).info("Installing SIGUSR2 streamer handler ...") - asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) + if platform.system() != 'Windows': + asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) prev: dict = {} while True: @@ -338,7 +340,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes session = self.__ensure_client_session() try: return (await session.get_state()) - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError,TimeoutError,asyncio.CancelledError,asyncio.TimeoutError,asyncio.CancelledError): pass except Exception: get_logger().exception("Invalid streamer response from /state") diff --git a/kvmd/clients/__init__.py b/kvmd/clients/__init__.py index f0645fd2..e06bda14 100644 --- a/kvmd/clients/__init__.py +++ b/kvmd/clients/__init__.py @@ -75,12 +75,12 @@ class BaseHttpClient: def _make_http_session(self, headers: dict[str, str] | None = None) -> aiohttp.ClientSession: connector = None #这里临时使用 socket ,后期考虑是否使用 http 方式 - use_unix_socket = True + use_unix_socket = False if use_unix_socket: connector = aiohttp.UnixConnector(path=self.__unix_path) base_url = "http://localhost:0" # 继续使用 Unix 域套接字 else: - base_url = "http://127.0.0.1:8001" # 使用指定的 IP 和端口 + base_url = "http://127.0.0.1:8000" # 使用指定的 IP 和端口 #print("base_url:", base_url) return aiohttp.ClientSession( diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 5369892e..5ea6af98 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -32,7 +32,7 @@ from typing import Generator from typing import AsyncGenerator import aiohttp -import ustreamer +#import ustreamer from PIL import Image as PilImage diff --git a/kvmd/keyboard/printer.py b/kvmd/keyboard/printer.py index efee6d44..07e75e75 100644 --- a/kvmd/keyboard/printer.py +++ b/kvmd/keyboard/printer.py @@ -30,29 +30,8 @@ from .mappings import WebModifiers # ===== -def _load_libxkbcommon() -> ctypes.CDLL: - path = ctypes.util.find_library("xkbcommon") - if not path: - raise RuntimeError("Where is libxkbcommon?") - assert path - lib = ctypes.CDLL(path) - for (name, restype, argtypes) in [ - ("xkb_utf32_to_keysym", ctypes.c_uint32, [ctypes.c_uint32]), - ]: - func = getattr(lib, name) - if not func: - raise RuntimeError(f"Where is libc.{name}?") - setattr(func, "restype", restype) - setattr(func, "argtypes", argtypes) - return lib -_libxkbcommon = _load_libxkbcommon() - - -def _ch_to_keysym(ch: str) -> int: - assert len(ch) == 1 - return _libxkbcommon.xkb_utf32_to_keysym(ord(ch)) # ===== @@ -84,10 +63,6 @@ def text_to_web_keys( # pylint: disable=too-many-branches ch = "--" if not ch.isprintable(): continue - try: - keys = symmap[_ch_to_keysym(ch)] - except Exception: - continue for (modifiers, key) in keys.items(): if modifiers & SymmapModifiers.CTRL: diff --git a/kvmd/libc.py b/kvmd/libc.py index 53b25733..f004acde 100644 --- a/kvmd/libc.py +++ b/kvmd/libc.py @@ -34,34 +34,4 @@ from ctypes import c_void_p # ===== def _load_libc() -> ctypes.CDLL: - path = ctypes.util.find_library("c") - if not path: - raise RuntimeError("Where is libc?") - assert path - lib = ctypes.CDLL(path) - for (name, restype, argtypes) in [ - ("inotify_init", c_int, []), - ("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]), - ("inotify_rm_watch", c_int, [c_int, c_uint32]), - ("renameat2", c_int, [c_int, c_char_p, c_int, c_char_p, c_uint]), - ("free", c_int, [c_void_p]), - ]: - func = getattr(lib, name) - if not func: - raise RuntimeError(f"Where is libc.{name}?") - setattr(func, "restype", restype) - setattr(func, "argtypes", argtypes) - return lib - - -_libc = _load_libc() - - -# ===== -get_errno = ctypes.get_errno - -inotify_init = _libc.inotify_init -inotify_add_watch = _libc.inotify_add_watch -inotify_rm_watch = _libc.inotify_rm_watch -renameat2 = _libc.renameat2 -free = _libc.free + pass diff --git a/kvmd/plugins/__init__.py b/kvmd/plugins/__init__.py index ce2567f1..17bd4622 100644 --- a/kvmd/plugins/__init__.py +++ b/kvmd/plugins/__init__.py @@ -52,8 +52,8 @@ def get_plugin_class(sub: str, name: str) -> type[BasePlugin]: assert name if name.startswith("_"): raise UnknownPluginError(f"Unknown plugin '{sub}/{name}'") - try: - module = importlib.import_module(f"kvmd.plugins.{sub}.{name}") - except ModuleNotFoundError: - raise UnknownPluginError(f"Unknown plugin '{sub}/{name}'") + #try: + module = importlib.import_module(f"kvmd.plugins.{sub}.{name}") + #except ModuleNotFoundError: + #raise UnknownPluginError(f"Unknown plugin '{sub}/{name}'") return getattr(module, "Plugin") diff --git a/kvmd/plugins/hid/ch9329/__init__.py b/kvmd/plugins/hid/ch9329/__init__.py index 1b235090..05459fa2 100644 --- a/kvmd/plugins/hid/ch9329/__init__.py +++ b/kvmd/plugins/hid/ch9329/__init__.py @@ -200,6 +200,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst while not self.__stop_event.is_set(): try: self.__hid_loop() + time.sleep(1) except Exception: logger.exception("Unexpected error in the run loop") time.sleep(1) @@ -222,6 +223,10 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst self.__process_cmd(conn, b"") else: self.__process_cmd(conn, cmd) + except KeyboardInterrupt: + get_logger(0).info("KeyboardInterrupt received, exiting HID loop.") + self.clear_events() + break except Exception: self.clear_events() get_logger(0).exception("Unexpected error in the HID loop") diff --git a/kvmd/plugins/ugpio/gpio.py b/kvmd/plugins/ugpio/gpio.py index 6cda826b..311acaf1 100644 --- a/kvmd/plugins/ugpio/gpio.py +++ b/kvmd/plugins/ugpio/gpio.py @@ -23,7 +23,7 @@ from typing import Callable from typing import Any -import gpiod +#import gpiod from ... import aiotools from ... import aiogp diff --git a/kvmd_data/etc/kvmd/override.yaml b/kvmd_data/etc/kvmd/override.yaml index 4ee138dd..40f04548 100644 --- a/kvmd_data/etc/kvmd/override.yaml +++ b/kvmd_data/etc/kvmd/override.yaml @@ -26,7 +26,7 @@ kvmd: hid: type: ch9329 - device: /dev/ttyUSB0 + device: COM7 speed: 115200 read_timeout: 0.3 @@ -53,7 +53,7 @@ kvmd: streamer: resolution: - default: 1920x1080 + default: 1280x720 forever: true @@ -64,32 +64,30 @@ kvmd: h264_bitrate: default: 8000 - cmd: - - "kvmd_data/usr/bin/ustreamer" - - "--device=/dev/video0" - - "--persistent" - - "--format=mjpeg" - - "--resolution={resolution}" - - "--desired-fps={desired_fps}" - - "--drop-same-frames=30" - - "--last-as-blank=0" - - "--unix={unix}" - - "--unix-rm" - - "--unix-mode=777" - - "--exit-on-parent-death" - - "--notify-parent" - - "--no-log-colors" - - "--jpeg-sink=kvmd::ustreamer::jpeg" - - "--jpeg-sink-mode=0660" - - "--slowdown" + pre_start_cmd: kvmd_data/win/true.exe + post_stop_cmd: kvmd_data/win/true.exe - unix: kvmd_data/run/kvmd/ustreamer.sock + cmd: + - "C:/Users/mofen/miniconda3/python.exe" + - "ustreamer-win/ustreamer-win.py" + - "--device=0" + - "--resolution={resolution}" + - "--fps={desired_fps}" + - "--quality=100" + + + unix: http://localhost:8000 ipmi: auth: file: kvmd_data/etc/kvmd/ipmipasswd +pst: + remount_cmd: + - "kvmd_data/win/true.exe" + + vnc: keymap: kvmd_data/usr/share/kvmd/keymaps/en-us mouse_output: usb @@ -110,10 +108,20 @@ vnc: otgnet: commands: + pre_start_cmd: + - "kvmd_data/win/true.exe" + post_stop_cmd: + - "kvmd_data/win/true.exe" post_start_cmd: - - "/bin/true" + - "kvmd_data/win/true.exe" pre_stop_cmd: - - "/bin/true" + - "kvmd_data/win/true.exe" + iface: + ip_cmd: + - "kvmd_data/win/true.exe" + firewall: + iptables_cmd: + - "kvmd_data/win/true.exe" nginx: http: @@ -123,4 +131,4 @@ nginx: janus: cmd: - - "/bin/true" \ No newline at end of file + - "kvmd_data/win/true.exe" \ No newline at end of file diff --git a/kvmd_data/usr/share/kvmd/web/kvm/index.html b/kvmd_data/usr/share/kvmd/web/kvm/index.html index f5799301..774a64bb 100644 --- a/kvmd_data/usr/share/kvmd/web/kvm/index.html +++ b/kvmd_data/usr/share/kvmd/web/kvm/index.html @@ -899,7 +899,7 @@
-
+
diff --git a/kvmd_data/usr/share/kvmd/web/kvm/window-stream.pug b/kvmd_data/usr/share/kvmd/web/kvm/window-stream.pug index 918260ab..ee75646c 100644 --- a/kvmd_data/usr/share/kvmd/web/kvm/window-stream.pug +++ b/kvmd_data/usr/share/kvmd/web/kvm/window-stream.pug @@ -13,7 +13,7 @@ div(id="stream-window" class="window window-resizable") div(id="stream-info") button(class="window-button-exit-full-tab") ▼ - div(id="stream-box" class="stream-box-offline") + div(id="stream-box" class="stream-box-online") img(id="stream-image" src=`${png_dir}/blank-stream.png`) video(id="stream-video" class="hidden" disablePictureInPicture="true" autoplay playsinline muted) div(id="stream-fullscreen-active") diff --git a/kvmd_data/win/true.exe b/kvmd_data/win/true.exe new file mode 100644 index 00000000..fc7475a8 Binary files /dev/null and b/kvmd_data/win/true.exe differ diff --git a/tools/list_devices.py b/tools/list_devices.py new file mode 100644 index 00000000..bb455dc6 --- /dev/null +++ b/tools/list_devices.py @@ -0,0 +1,46 @@ +import cv2 +import serial.tools.list_ports +import os +import sys + +# 隐藏 OpenCV 的错误输出 +def suppress_opencv_warnings(): + #cv2.utils.logging.setLogLevel(cv2.utils.logging.LOG_LEVEL_SILENT) + pass + +def list_video_devices(): + """列出可用的视频设备及其名称""" + video_devices = [] + for i in range(10): # 假设最多有10个视频设备 + cap = cv2.VideoCapture(i) + if cap.isOpened(): + device_name = cap.getBackendName() # 获取设备名称 + video_devices.append((i, device_name)) + cap.release() + return video_devices + +def list_serial_ports(): + """列出可用的串口设备""" + return [port.device for port in serial.tools.list_ports.comports()] + +def main(): + suppress_opencv_warnings() # 调用函数以隐藏 OpenCV 的错误输出 + + print("可用的视频设备索引及名称:") + video_devices = list_video_devices() + if video_devices: + for index, name in video_devices: + print(f"视频设备索引: {index}, 名称: {name}") + else: + print("未找到视频设备。") + + print("\n可用的串口设备:") + serial_ports = list_serial_ports() + if serial_ports: + for port in serial_ports: + print(f"串口设备: {port}") + else: + print("未找到串口设备。") + +if __name__ == "__main__": + main() diff --git a/ustreamer-win/mjpeg_stream.py b/ustreamer-win/mjpeg_stream.py new file mode 100644 index 00000000..b9b13529 --- /dev/null +++ b/ustreamer-win/mjpeg_stream.py @@ -0,0 +1,233 @@ +import asyncio +import threading +import time +import json +from collections import deque +from typing import List, Optional, Tuple, Union, Dict, Any + +import aiohttp +import cv2 +import logging +import numpy as np +from aiohttp import MultipartWriter, web +from aiohttp.web_runner import GracefulExit + +class MjpegStream: + """MJPEG video stream class for handling video frames and providing HTTP streaming service""" + + def __init__( + self, + name: str, + size: Optional[Tuple[int, int]] = None, + quality: int = 50, + fps: int = 30, + host: str = "localhost", + port: int = 8000, + device_name: str = "Unknown Camera", + log_requests: bool = True + ) -> None: + """ + Initialize MJPEG stream + + Args: + name: Stream name + size: Video size (width, height) + quality: JPEG compression quality (1-100) + fps: Target frame rate + host: Server host address + port: Server port + device_name: Camera device name + log_requests: Whether to log stream requests + """ + self.name = name.lower().replace(" ", "_") + self.size = size + self.quality = max(1, min(quality, 100)) + self.fps = fps + self._host = host + self._port = port + self._device_name = device_name + self.log_requests = log_requests + + # Video frame and synchronization + self._frame = np.zeros((320, 240, 1), dtype=np.uint8) + self._lock = asyncio.Lock() + self._byte_frame_window = deque(maxlen=30) + self._bandwidth_last_modified_time = time.time() + self._is_online = True + self._last_frame_time = time.time() + + + # 设置日志级别为ERROR,以隐藏HTTP请求日志 + if not self.log_requests: + logging.getLogger('aiohttp.access').setLevel(logging.ERROR) + + # Server setup + self._app = web.Application() + self._app.router.add_route("GET", f"/{self.name}", self._stream_handler) + self._app.router.add_route("GET", "/state", self._state_handler) + self._app.router.add_route("GET", "/", self._index_handler) + self._app.is_running = False + + def set_frame(self, frame: np.ndarray) -> None: + """Set the current video frame""" + self._frame = frame + self._last_frame_time = time.time() + self._is_online = True + + def get_bandwidth(self) -> float: + """Get current bandwidth usage (bytes/second)""" + if time.time() - self._bandwidth_last_modified_time >= 1: + self._byte_frame_window.clear() + return sum(self._byte_frame_window) + + async def _process_frame(self) -> Tuple[np.ndarray, Dict[str, str]]: + """Process video frame (resize and JPEG encode)""" + frame = cv2.resize( + self._frame, self.size or (self._frame.shape[1], self._frame.shape[0]) + ) + success, encoded = cv2.imencode( + ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality] + ) + if not success: + raise ValueError("Error encoding frame") + + self._byte_frame_window.append(len(encoded.tobytes())) + self._bandwidth_last_modified_time = time.time() + + # Add KVMD-compatible header information + headers = { + "X-UStreamer-Online": str(self._is_online).lower(), + "X-UStreamer-Width": str(frame.shape[1]), + "X-UStreamer-Height": str(frame.shape[0]), + "X-UStreamer-Name": self._device_name, + "X-Timestamp": str(int(time.time() * 1000)), + "Cache-Control": "no-store", + "Pragma": "no-cache", + "Expires": "0", + } + + return encoded, headers + + async def _stream_handler(self, request: web.Request) -> web.StreamResponse: + """Handle MJPEG stream requests""" + response = web.StreamResponse( + status=200, + reason="OK", + headers={"Content-Type": "multipart/x-mixed-replace;boundary=frame"} + ) + await response.prepare(request) + + if self.log_requests: + print(f"Stream request received: {request.path}") + + + while True: + await asyncio.sleep(1 / self.fps) + + # Check if the device is online + if time.time() - self._last_frame_time > 5: + self._is_online = False + + async with self._lock: + frame, headers = await self._process_frame() + + with MultipartWriter("image/jpeg", boundary="frame") as mpwriter: + part = mpwriter.append(frame.tobytes(), {"Content-Type": "image/jpeg"}) + for key, value in headers.items(): + part.headers[key] = value + try: + await mpwriter.write(response, close_boundary=False) + except (ConnectionResetError, ConnectionAbortedError): + return web.Response(status=499) + await response.write(b"\r\n") + + async def _state_handler(self, request: web.Request) -> web.Response: + """Handle /state requests and return device status information""" + state = { + "result": { + "instance_id": "", + "encoder": { + "type": "CPU", + "quality": self.quality + }, + "h264": { + "bitrate": 4875, + "gop": 60, + "online": self._is_online, + "fps": self.fps + }, + "sinks": { + "jpeg": { + "has_clients": False + }, + "h264": { + "has_clients": False + } + }, + "source": { + "resolution": { + "width": self.size[0] if self.size else self._frame.shape[1], + "height": self.size[1] if self.size else self._frame.shape[0] + }, + "online": self._is_online, + "desired_fps": self.fps, + "captured_fps": 0 # You can update this with actual captured fps if needed + }, + "stream": { + "queued_fps": 2, # Placeholder value, update as needed + "clients": 1, # Placeholder value, update as needed + "clients_stat": { + "70bf63a507f71e47": { + "fps": 2, # Placeholder value, update as needed + "extra_headers": False, + "advance_headers": True, + "dual_final_frames": False, + "zero_data": False, + "key": "tIR9TtuedKIzDYZa" # Placeholder key, update as needed + } + } + } + } + } + return web.Response( + text=json.dumps(state), + content_type="application/json" + ) + + async def _index_handler(self, _: web.Request) -> web.Response: + """Handle root path requests and display available streams""" + html = f""" +

Available Video Streams:

+ + """ + return web.Response(text=html, content_type="text/html") + + def start(self) -> None: + """Start the stream server""" + if not self._app.is_running: + threading.Thread(target=self._run_server, daemon=True).start() + self._app.is_running = True + print(f"\nVideo stream URL: http://{self._host}:{self._port}/{self.name}") + else: + print("\nServer is already running\n") + + def stop(self) -> None: + """Stop the stream server""" + if self._app.is_running: + self._app.is_running = False + print("\nStopping server...\n") + raise GracefulExit() + print("\nServer is not running\n") + + def _run_server(self) -> None: + """Run the server in a new thread""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + runner = web.AppRunner(self._app) + loop.run_until_complete(runner.setup()) + site = web.TCPSite(runner, self._host, self._port) + loop.run_until_complete(site.start()) + loop.run_forever() \ No newline at end of file diff --git a/ustreamer-win/ustreamer-win.py b/ustreamer-win/ustreamer-win.py new file mode 100644 index 00000000..2c894a54 --- /dev/null +++ b/ustreamer-win/ustreamer-win.py @@ -0,0 +1,197 @@ +import argparse +import cv2 +import logging +import platform +from mjpeg_stream import MjpegStream + +def configure_logging(): + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + return logging.getLogger(__name__) + +def get_windows_cameras(logger): + """Retrieve available camera devices on Windows system""" + from win32com.client import Dispatch + devices = [] + try: + wmi = Dispatch("WbemScripting.SWbemLocator") + service = wmi.ConnectServer(".", "root\\cimv2") + items = service.ExecQuery("SELECT * FROM Win32_PnPEntity WHERE (PNPClass = 'Image' OR PNPClass = 'Camera')") + + for item in items: + devices.append({ + 'name': item.Name, + 'device_id': item.DeviceID + }) + logger.info(f"Found camera device: {item.Name}") + except Exception as e: + logger.error(f"Error enumerating camera devices: {str(e)}") + return devices + +def test_camera(index, logger): + """Test if the camera is available""" + try: + cap = cv2.VideoCapture(index, cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY) + if cap.isOpened(): + ret, _ = cap.read() + cap.release() + return ret + except Exception as e: + logger.debug(f"Error testing camera {index}: {str(e)}") + return False + +def find_camera_by_name(camera_name, logger): + """Find device index by camera name""" + if platform.system() != "Windows": + logger.warning("Finding camera by name is only supported on Windows") + return None + + devices = get_windows_cameras(logger) + for device in devices: + if camera_name.lower() in device['name'].lower(): + # Try to find an available index + for i in range(5): # Usually no more than 5 devices + if test_camera(i, logger): + logger.info(f"Found matching camera '{device['name']}' at index {i}") + return i + return None + +def get_first_available_camera(logger): + """Get the first available camera""" + for i in range(5): + if test_camera(i, logger): + return i + return None + +def parse_arguments(): + parser = argparse.ArgumentParser(description='MJPEG Stream Demonstration') + device_group = parser.add_mutually_exclusive_group() + device_group.add_argument('--device', type=int, help='Camera device index') + device_group.add_argument('--device-name', type=str, help='Camera device name (only supported on Windows)') + parser.add_argument('--resolution', type=str, default='640x480', help='Video resolution (e.g., 640x480)') + parser.add_argument('--quality', type=int, default=100, help='JPEG quality (1-100)') + parser.add_argument('--fps', type=int, default=30, help='Target FPS') + parser.add_argument('--host', type=str, default='localhost', help='Server address') + parser.add_argument('--port', type=int, default=8000, help='Server port') + args = parser.parse_args() + + # Validate arguments + if args.quality < 1 or args.quality > 100: + raise ValueError("Quality must be between 1 and 100.") + if args.fps <= 0: + raise ValueError("FPS must be greater than 0.") + + # Parse resolution + try: + width, height = map(int, args.resolution.split('x')) + except ValueError: + raise ValueError("Resolution must be in the format WIDTHxHEIGHT (e.g., 640x480).") + + args.width = width + args.height = height + + return args + +def main(): + logger = configure_logging() + args = parse_arguments() + + # Determine which camera device to use + device_index = None + + if args.device_name: + if platform.system() != "Windows": + logger.error("Specifying camera by name is only supported on Windows") + return + device_index = find_camera_by_name(args.device_name, logger) + if device_index is None: + logger.error(f"No available camera found with a name containing '{args.device_name}'") + return + elif args.device is not None: + if test_camera(args.device, logger): + device_index = args.device + else: + logger.warning(f"The specified device index {args.device} is not available") + + if device_index is None: + device_index = get_first_available_camera(logger) + if device_index is None: + logger.error("No available camera devices were found") + return + logger.info(f"Using the first available camera device (index: {device_index})") + + # Initialize the camera + try: + cap = cv2.VideoCapture(device_index, cv2.CAP_DSHOW if platform.system() == "Windows" else cv2.CAP_ANY) + + if not cap.isOpened(): + logger.error(f"Unable to open camera {device_index}") + return + + # Set camera parameters + cap.set(cv2.CAP_PROP_FRAME_WIDTH, args.width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, args.height) + + # Verify camera settings + actual_width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) + actual_height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) + if actual_width != args.width or actual_height != args.height: + logger.warning(f"Actual resolution ({actual_width}x{actual_height}) does not match requested resolution ({args.width}x{args.height})") + + # Test if we can read frames + ret, _ = cap.read() + if not ret: + logger.error("Unable to read video frames from the camera") + cap.release() + return + + except Exception as e: + logger.error(f"Error initializing the camera: {str(e)}") + if 'cap' in locals(): + cap.release() + return + + # Create and start the video stream + try: + stream = MjpegStream( + name="stream", + size=(int(actual_width), int(actual_height)), # Use actual resolution + quality=args.quality, + fps=args.fps, + host=args.host, + port=args.port, + device_name=args.device_name or f"Camera {device_index}", # Add device name + log_requests=False # 设置为False以隐藏HTTP请求日志 + ) + stream.start() + logger.info(f"Video stream started: http://{args.host}:{args.port}/stream") + + while True: + ret, frame = cap.read() + if not ret: + logger.error("Unable to read video frames") + break + + stream.set_frame(frame) + + except KeyboardInterrupt: + logger.info("User interrupt") + except Exception as e: + logger.error(f"An error occurred: {str(e)}") + finally: + logger.info("Cleaning up resources...") + try: + stream.stop() + except Exception as e: + logger.error(f"Error stopping the video stream: {str(e)}") + try: + cap.release() + except Exception as e: + logger.error(f"Error releasing the camera: {str(e)}") + cv2.destroyAllWindows() + logger.info("Program has exited") + +if __name__ == "__main__": + main() \ No newline at end of file