mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-13 01:30:31 +08:00
refactoring
This commit is contained in:
parent
17412be3fe
commit
686d6f7c2c
@ -104,78 +104,69 @@ def _check_config(config: Section) -> None:
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def _create_serial(gadget_path: str, config_path: str) -> None:
|
class _GadgetConfig:
|
||||||
func_path = join(gadget_path, "functions/acm.usb0")
|
def __init__(self, gadget_path: str, config_path: str) -> None:
|
||||||
_mkdir(func_path)
|
self.__gadget_path = gadget_path
|
||||||
_symlink(func_path, join(config_path, "acm.usb0"))
|
self.__config_path = config_path
|
||||||
|
|
||||||
|
def create_serial(self) -> None:
|
||||||
|
func_path = join(self.__gadget_path, "functions/acm.usb0")
|
||||||
|
_mkdir(func_path)
|
||||||
|
_symlink(func_path, join(self.__config_path, "acm.usb0"))
|
||||||
|
|
||||||
def _create_ethernet(gadget_path: str, config_path: str, driver: str, host_mac: str, kvm_mac: str) -> None:
|
def create_ethernet(self, driver: str, host_mac: str, kvm_mac: str) -> None:
|
||||||
if host_mac and kvm_mac and host_mac == kvm_mac:
|
if host_mac and kvm_mac and host_mac == kvm_mac:
|
||||||
raise RuntimeError("Ethernet host_mac should not be equal to kvm_mac")
|
raise RuntimeError("Ethernet host_mac should not be equal to kvm_mac")
|
||||||
real_driver = driver
|
real_driver = driver
|
||||||
if driver == "rndis5":
|
if driver == "rndis5":
|
||||||
real_driver = "rndis"
|
real_driver = "rndis"
|
||||||
func_path = join(gadget_path, f"functions/{real_driver}.usb0")
|
func_path = join(self.__gadget_path, f"functions/{real_driver}.usb0")
|
||||||
_mkdir(func_path)
|
_mkdir(func_path)
|
||||||
if host_mac:
|
if host_mac:
|
||||||
_write(join(func_path, "host_addr"), host_mac)
|
_write(join(func_path, "host_addr"), host_mac)
|
||||||
if kvm_mac:
|
if kvm_mac:
|
||||||
_write(join(func_path, "dev_addr"), kvm_mac)
|
_write(join(func_path, "dev_addr"), kvm_mac)
|
||||||
if driver in ["ncm", "rndis"]:
|
if driver in ["ncm", "rndis"]:
|
||||||
_write(join(gadget_path, "os_desc/use"), "1")
|
_write(join(self.__gadget_path, "os_desc/use"), "1")
|
||||||
_write(join(gadget_path, "os_desc/b_vendor_code"), "0xCD")
|
_write(join(self.__gadget_path, "os_desc/b_vendor_code"), "0xCD")
|
||||||
_write(join(gadget_path, "os_desc/qw_sign"), "MSFT100")
|
_write(join(self.__gadget_path, "os_desc/qw_sign"), "MSFT100")
|
||||||
if driver == "ncm":
|
if driver == "ncm":
|
||||||
_write(join(func_path, "os_desc/interface.ncm/compatible_id"), "WINNCM")
|
_write(join(func_path, "os_desc/interface.ncm/compatible_id"), "WINNCM")
|
||||||
elif driver == "rndis":
|
elif driver == "rndis":
|
||||||
# On Windows 7 and later, the RNDIS 5.1 driver would be used by default,
|
# On Windows 7 and later, the RNDIS 5.1 driver would be used by default,
|
||||||
# but it does not work very well. The RNDIS 6.0 driver works better.
|
# but it does not work very well. The RNDIS 6.0 driver works better.
|
||||||
# In order to get this driver to load automatically, we have to use
|
# In order to get this driver to load automatically, we have to use
|
||||||
# a Microsoft-specific extension of USB.
|
# a Microsoft-specific extension of USB.
|
||||||
_write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS")
|
_write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS")
|
||||||
_write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001")
|
_write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001")
|
||||||
_symlink(config_path, join(gadget_path, "os_desc/c.1"))
|
_symlink(self.__config_path, join(self.__gadget_path, "os_desc/c.1"))
|
||||||
_symlink(func_path, join(config_path, f"{real_driver}.usb0"))
|
_symlink(func_path, join(self.__config_path, f"{real_driver}.usb0"))
|
||||||
|
|
||||||
|
def create_hid(self, instance: int, remote_wakeup: bool, hid: Hid) -> None:
|
||||||
|
func_path = join(self.__gadget_path, f"functions/hid.usb{instance}")
|
||||||
|
_mkdir(func_path)
|
||||||
|
_write(join(func_path, "no_out_endpoint"), "1", optional=True)
|
||||||
|
if remote_wakeup:
|
||||||
|
_write(join(func_path, "wakeup_on_write"), "1", optional=True)
|
||||||
|
_write(join(func_path, "protocol"), str(hid.protocol))
|
||||||
|
_write(join(func_path, "subclass"), str(hid.subclass))
|
||||||
|
_write(join(func_path, "report_length"), str(hid.report_length))
|
||||||
|
_write_bytes(join(func_path, "report_desc"), hid.report_descriptor)
|
||||||
|
_symlink(func_path, join(self.__config_path, f"hid.usb{instance}"))
|
||||||
|
|
||||||
def _create_hid(gadget_path: str, config_path: str, instance: int, remote_wakeup: bool, hid: Hid) -> None:
|
def create_msd(self, instance: int, user: str, stall: bool, cdrom: bool, rw: bool, removable: bool, fua: bool) -> None:
|
||||||
func_path = join(gadget_path, f"functions/hid.usb{instance}")
|
func_path = join(self.__gadget_path, f"functions/mass_storage.usb{instance}")
|
||||||
_mkdir(func_path)
|
_mkdir(func_path)
|
||||||
_write(join(func_path, "no_out_endpoint"), "1", optional=True)
|
_write(join(func_path, "stall"), str(int(stall)))
|
||||||
if remote_wakeup:
|
_write(join(func_path, "lun.0/cdrom"), str(int(cdrom)))
|
||||||
_write(join(func_path, "wakeup_on_write"), "1", optional=True)
|
_write(join(func_path, "lun.0/ro"), str(int(not rw)))
|
||||||
_write(join(func_path, "protocol"), str(hid.protocol))
|
_write(join(func_path, "lun.0/removable"), str(int(removable)))
|
||||||
_write(join(func_path, "subclass"), str(hid.subclass))
|
_write(join(func_path, "lun.0/nofua"), str(int(not fua)))
|
||||||
_write(join(func_path, "report_length"), str(hid.report_length))
|
if user != "root":
|
||||||
_write_bytes(join(func_path, "report_desc"), hid.report_descriptor)
|
_chown(join(func_path, "lun.0/cdrom"), user)
|
||||||
_symlink(func_path, join(config_path, f"hid.usb{instance}"))
|
_chown(join(func_path, "lun.0/ro"), user)
|
||||||
|
_chown(join(func_path, "lun.0/file"), user)
|
||||||
|
_symlink(func_path, join(self.__config_path, f"mass_storage.usb{instance}"))
|
||||||
def _create_msd(
|
|
||||||
gadget_path: str,
|
|
||||||
config_path: str,
|
|
||||||
instance: int,
|
|
||||||
user: str,
|
|
||||||
stall: bool,
|
|
||||||
cdrom: bool,
|
|
||||||
rw: bool,
|
|
||||||
removable: bool,
|
|
||||||
fua: bool,
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
func_path = join(gadget_path, f"functions/mass_storage.usb{instance}")
|
|
||||||
_mkdir(func_path)
|
|
||||||
_write(join(func_path, "stall"), str(int(stall)))
|
|
||||||
_write(join(func_path, "lun.0/cdrom"), str(int(cdrom)))
|
|
||||||
_write(join(func_path, "lun.0/ro"), str(int(not rw)))
|
|
||||||
_write(join(func_path, "lun.0/removable"), str(int(removable)))
|
|
||||||
_write(join(func_path, "lun.0/nofua"), str(int(not fua)))
|
|
||||||
if user != "root":
|
|
||||||
_chown(join(func_path, "lun.0/cdrom"), user)
|
|
||||||
_chown(join(func_path, "lun.0/ro"), user)
|
|
||||||
_chown(join(func_path, "lun.0/file"), user)
|
|
||||||
_symlink(func_path, join(config_path, f"mass_storage.usb{instance}"))
|
|
||||||
|
|
||||||
|
|
||||||
def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
|
def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
|
||||||
@ -221,36 +212,38 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
|
|||||||
# XXX: Should we use MaxPower=100 with Remote Wakeup?
|
# XXX: Should we use MaxPower=100 with Remote Wakeup?
|
||||||
_write(join(config_path, "bmAttributes"), "0xA0")
|
_write(join(config_path, "bmAttributes"), "0xA0")
|
||||||
|
|
||||||
|
gc = _GadgetConfig(gadget_path, config_path)
|
||||||
|
|
||||||
if config.otg.devices.serial.enabled:
|
if config.otg.devices.serial.enabled:
|
||||||
logger.info("===== Serial =====")
|
logger.info("===== Serial =====")
|
||||||
_create_serial(gadget_path, config_path)
|
gc.create_serial()
|
||||||
|
|
||||||
if config.otg.devices.ethernet.enabled:
|
if config.otg.devices.ethernet.enabled:
|
||||||
logger.info("===== Ethernet =====")
|
logger.info("===== Ethernet =====")
|
||||||
_create_ethernet(gadget_path, config_path, **config.otg.devices.ethernet._unpack(ignore=["enabled"]))
|
gc.create_ethernet(**config.otg.devices.ethernet._unpack(ignore=["enabled"]))
|
||||||
|
|
||||||
if config.kvmd.hid.type == "otg":
|
if config.kvmd.hid.type == "otg":
|
||||||
logger.info("===== HID-Keyboard =====")
|
logger.info("===== HID-Keyboard =====")
|
||||||
_create_hid(gadget_path, config_path, 0, config.otg.remote_wakeup, make_keyboard_hid())
|
gc.create_hid(0, config.otg.remote_wakeup, make_keyboard_hid())
|
||||||
logger.info("===== HID-Mouse =====")
|
logger.info("===== HID-Mouse =====")
|
||||||
_create_hid(gadget_path, config_path, 1, config.otg.remote_wakeup, make_mouse_hid(
|
gc.create_hid(1, config.otg.remote_wakeup, make_mouse_hid(
|
||||||
absolute=config.kvmd.hid.mouse.absolute,
|
absolute=config.kvmd.hid.mouse.absolute,
|
||||||
horizontal_wheel=config.kvmd.hid.mouse.horizontal_wheel,
|
horizontal_wheel=config.kvmd.hid.mouse.horizontal_wheel,
|
||||||
))
|
))
|
||||||
if config.kvmd.hid.mouse_alt.device:
|
if config.kvmd.hid.mouse_alt.device:
|
||||||
logger.info("===== HID-Mouse-Alt =====")
|
logger.info("===== HID-Mouse-Alt =====")
|
||||||
_create_hid(gadget_path, config_path, 2, config.otg.remote_wakeup, make_mouse_hid(
|
gc.create_hid(2, config.otg.remote_wakeup, make_mouse_hid(
|
||||||
absolute=(not config.kvmd.hid.mouse.absolute),
|
absolute=(not config.kvmd.hid.mouse.absolute),
|
||||||
horizontal_wheel=config.kvmd.hid.mouse_alt.horizontal_wheel,
|
horizontal_wheel=config.kvmd.hid.mouse_alt.horizontal_wheel,
|
||||||
))
|
))
|
||||||
|
|
||||||
if config.kvmd.msd.type == "otg":
|
if config.kvmd.msd.type == "otg":
|
||||||
logger.info("===== MSD =====")
|
logger.info("===== MSD =====")
|
||||||
_create_msd(gadget_path, config_path, 0, config.otg.user, **config.otg.devices.msd.default._unpack())
|
gc.create_msd(0, config.otg.user, **config.otg.devices.msd.default._unpack())
|
||||||
if config.otg.devices.drives.enabled:
|
if config.otg.devices.drives.enabled:
|
||||||
for instance in range(config.otg.devices.drives.count):
|
for instance in range(config.otg.devices.drives.count):
|
||||||
logger.info("===== MSD Extra: %d =====", config.otg.devices.drives.count)
|
logger.info("===== MSD Extra: %d =====", config.otg.devices.drives.count)
|
||||||
_create_msd(gadget_path, config_path, instance + 1, "root", **config.otg.devices.drives.default._unpack())
|
gc.create_msd(instance + 1, "root", **config.otg.devices.drives.default._unpack())
|
||||||
|
|
||||||
logger.info("===== Preparing complete =====")
|
logger.info("===== Preparing complete =====")
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user