pikvm/pikvm#1501: Switch: Option to disable HDMI dummy plug

This commit is contained in:
Maxim Devaev 2025-05-03 03:54:05 +03:00
parent 69d254d80e
commit 7d7edb1c03
10 changed files with 139 additions and 17 deletions

View File

@ -79,6 +79,7 @@ class SwitchApi:
param: validator(req.query.get(param))
for (param, validator) in [
("edid_id", (lambda arg: valid_switch_edid_id(arg, allow_default=True))),
("dummy", valid_bool),
("name", valid_switch_port_name),
("atx_click_power_delay", valid_switch_atx_click_delay),
("atx_click_power_long_delay", valid_switch_atx_click_delay),

View File

@ -32,6 +32,7 @@ from .lib import Inotify
from .types import Edid
from .types import Edids
from .types import Dummies
from .types import Color
from .types import Colors
from .types import PortNames
@ -68,6 +69,7 @@ class SwitchUnknownEdidError(SwitchOperationError):
# =====
class Switch: # pylint: disable=too-many-public-methods
__X_EDIDS = "edids"
__X_DUMMIES = "dummies"
__X_COLORS = "colors"
__X_PORT_NAMES = "port_names"
__X_ATX_CP_DELAYS = "atx_cp_delays"
@ -75,7 +77,7 @@ class Switch: # pylint: disable=too-many-public-methods
__X_ATX_CR_DELAYS = "atx_cr_delays"
__X_ALL = frozenset([
__X_EDIDS, __X_COLORS, __X_PORT_NAMES,
__X_EDIDS, __X_DUMMIES, __X_COLORS, __X_PORT_NAMES,
__X_ATX_CP_DELAYS, __X_ATX_CPL_DELAYS, __X_ATX_CR_DELAYS,
])
@ -105,6 +107,12 @@ class Switch: # pylint: disable=too-many-public-methods
if save:
self.__save_notifier.notify()
def __x_set_dummies(self, dummies: Dummies, save: bool=True) -> None:
self.__chain.set_dummies(dummies)
self.__cache.set_dummies(dummies)
if save:
self.__save_notifier.notify()
def __x_set_colors(self, colors: Colors, save: bool=True) -> None:
self.__chain.set_colors(colors)
self.__cache.set_colors(colors)
@ -236,6 +244,7 @@ class Switch: # pylint: disable=too-many-public-methods
self,
port: int,
edid_id: (str | None)=None,
dummy: (bool | None)=None,
name: (str | None)=None,
atx_click_power_delay: (float | None)=None,
atx_click_power_long_delay: (float | None)=None,
@ -250,15 +259,16 @@ class Switch: # pylint: disable=too-many-public-methods
edids.assign(port, edid_id)
self.__x_set_edids(edids)
for (key, value) in [
(self.__X_PORT_NAMES, name),
(self.__X_ATX_CP_DELAYS, atx_click_power_delay),
(self.__X_ATX_CPL_DELAYS, atx_click_power_long_delay),
(self.__X_ATX_CR_DELAYS, atx_click_reset_delay),
for (reset, key, value) in [
(None, self.__X_DUMMIES, dummy), # None can't be used now
("", self.__X_PORT_NAMES, name),
(0, self.__X_ATX_CP_DELAYS, atx_click_power_delay),
(0, self.__X_ATX_CPL_DELAYS, atx_click_power_long_delay),
(0, self.__X_ATX_CR_DELAYS, atx_click_reset_delay),
]:
if value is not None:
new = getattr(self.__cache, f"get_{key}")()
new[port] = (value or None) # None == reset to default
new[port] = (None if value == reset else value) # Value or reset default
getattr(self, f"_Switch__x_set_{key}")(new)
# =====
@ -375,7 +385,7 @@ class Switch: # pylint: disable=too-many-public-methods
prevs = dict.fromkeys(self.__X_ALL)
while True:
await self.__save_notifier.wait()
while (await self.__save_notifier.wait(5)):
while not (await self.__save_notifier.wait(5)):
pass
while True:
try:

View File

@ -34,6 +34,7 @@ from .lib import aiotools
from .lib import aioproc
from .types import Edids
from .types import Dummies
from .types import Colors
from .proto import Response
@ -80,6 +81,11 @@ class _CmdSetEdids(_BaseCmd):
edids: Edids
@dataclasses.dataclass(frozen=True)
class _CmdSetDummies(_BaseCmd):
dummies: Dummies
@dataclasses.dataclass(frozen=True)
class _CmdSetColors(_BaseCmd):
colors: Colors
@ -189,7 +195,7 @@ class Chain: # pylint: disable=too-many-instance-attributes
self.__actual = False
self.__edids = Edids()
self.__dummies = Dummies({})
self.__colors = Colors()
self.__units: list[_UnitContext] = []
@ -225,6 +231,9 @@ class Chain: # pylint: disable=too-many-instance-attributes
def set_edids(self, edids: Edids) -> None:
self.__queue_cmd(_CmdSetEdids(edids)) # Will be copied because of multiprocessing.Queue()
def set_dummies(self, dummies: Dummies) -> None:
self.__queue_cmd(_CmdSetDummies(dummies))
def set_colors(self, colors: Colors) -> None:
self.__queue_cmd(_CmdSetColors(colors))
@ -348,6 +357,9 @@ class Chain: # pylint: disable=too-many-instance-attributes
case _CmdSetEdids():
self.__edids = cmd.edids
case _CmdSetDummies():
self.__dummies = cmd.dummies
case _CmdSetColors():
self.__colors = cmd.colors
@ -373,7 +385,7 @@ class Chain: # pylint: disable=too-many-instance-attributes
def __adjust_quirks(self) -> None:
for (unit, ctx) in enumerate(self.__units):
if ctx.state is not None and (ctx.state.version.sw_dev or ctx.state.version.sw >= 7):
if ctx.state is not None and ctx.state.version.is_fresh(7):
ignore_hpd = (unit == 0 and self.__ignore_hpd_on_top)
if ctx.state.quirks.ignore_hpd != ignore_hpd:
get_logger().info("Applying quirk ignore_hpd=%s to [%d] ...",
@ -403,6 +415,7 @@ class Chain: # pylint: disable=too-many-instance-attributes
self.__ensure_config_port(unit, ctx)
if self.__actual:
self.__ensure_config_edids(unit, ctx)
self.__ensure_config_dummies(unit, ctx)
self.__ensure_config_colors(unit, ctx)
def __ensure_config_port(self, unit: int, ctx: _UnitContext) -> None:
@ -429,6 +442,19 @@ class Chain: # pylint: disable=too-many-instance-attributes
ctx.changing_rid = self.__device.request_set_edid(unit, ch, edid)
break # Busy globally
def __ensure_config_dummies(self, unit: int, ctx: _UnitContext) -> None:
assert ctx.state is not None
if ctx.state.version.is_fresh(8) and ctx.can_be_changed():
for ch in range(4):
port = self.get_virtual_port(unit, ch)
dummy = self.__dummies[port]
if ctx.state.video_dummies[ch] != dummy:
get_logger().info("Changing dummy flag on port %d on [%d:%d]: %d -> %d ...",
port, unit, ch,
ctx.state.video_dummies[ch], dummy)
ctx.changing_rid = self.__device.request_set_dummy(unit, ch, dummy)
break # Busy globally (actually not but it can be changed in the firmware)
def __ensure_config_colors(self, unit: int, ctx: _UnitContext) -> None:
assert self.__actual
assert ctx.state is not None

View File

@ -41,6 +41,7 @@ from .proto import BodySetBeacon
from .proto import BodyAtxClick
from .proto import BodySetEdid
from .proto import BodyClearEdid
from .proto import BodySetDummy
from .proto import BodySetColors
from .proto import BodySetQuirks
@ -164,6 +165,9 @@ class Device:
return self.__send_request(Header.SET_EDID, unit, BodySetEdid(ch, edid))
return self.__send_request(Header.CLEAR_EDID, unit, BodyClearEdid(ch))
def request_set_dummy(self, unit: int, ch: int, on: bool) -> int:
return self.__send_request(Header.SET_DUMMY, unit, BodySetDummy(ch, on))
def request_set_colors(self, unit: int, ch: int, colors: Colors) -> int:
return self.__send_request(Header.SET_COLORS, unit, BodySetColors(ch, colors))

View File

@ -61,6 +61,7 @@ class Header(Packable, Unpackable):
CLEAR_EDID = 10
SET_COLORS = 12
SET_QUIRKS = 13
SET_DUMMY = 14
__struct = struct.Struct("<BHBB")
@ -96,6 +97,9 @@ class UnitVersion:
sw: int
sw_dev: bool
def is_fresh(self, version: int) -> bool:
return (self.sw_dev or (self.sw >= version))
@dataclasses.dataclass(frozen=True)
class UnitFlags:
@ -121,11 +125,12 @@ class UnitState(Unpackable): # pylint: disable=too-many-instance-attributes
video_hpd: tuple[bool, bool, bool, bool, bool]
video_edid: tuple[bool, bool, bool, bool]
video_crc: tuple[int, int, int, int]
video_dummies: tuple[bool, bool, bool, bool]
usb_5v_sens: tuple[bool, bool, bool, bool]
atx_busy: tuple[bool, bool, bool, bool]
quirks: UnitQuirks
__struct = struct.Struct("<HHHBBHHHHHHBBBHHHHBxBB29x")
__struct = struct.Struct("<HHHBBHHHHHHBBBHHHHBxBBB28x")
def compare_edid(self, ch: int, edid: Optional["Edid"]) -> bool:
if edid is None:
@ -142,7 +147,7 @@ class UnitState(Unpackable): # pylint: disable=too-many-instance-attributes
sw_version, hw_version, flags, ch,
beacons, nc0, nc1, nc2, nc3, nc4, nc5,
video_5v_sens, video_hpd, video_edid, vc0, vc1, vc2, vc3,
usb_5v_sens, atx_busy, quirks,
usb_5v_sens, atx_busy, quirks, video_dummies,
) = cls.__struct.unpack_from(data, offset=offset)
return UnitState(
version=UnitVersion(
@ -163,6 +168,7 @@ class UnitState(Unpackable): # pylint: disable=too-many-instance-attributes
video_hpd=cls.__make_flags5(video_hpd),
video_edid=cls.__make_flags4(video_edid),
video_crc=(vc0, vc1, vc2, vc3),
video_dummies=cls.__make_flags4(video_dummies),
usb_5v_sens=cls.__make_flags4(usb_5v_sens),
atx_busy=cls.__make_flags4(atx_busy),
quirks=UnitQuirks(ignore_hpd=bool(quirks & 0x01)),
@ -270,6 +276,18 @@ class BodyClearEdid(Packable):
return self.ch.to_bytes()
@dataclasses.dataclass(frozen=True)
class BodySetDummy(Packable):
ch: int
on: bool
def __post_init__(self) -> None:
assert 0 <= self.ch <= 3
def pack(self) -> bytes:
return self.ch.to_bytes() + self.on.to_bytes()
@dataclasses.dataclass(frozen=True)
class BodySetColors(Packable):
ch: int

View File

@ -27,6 +27,7 @@ import time
from typing import AsyncGenerator
from .types import Edids
from .types import Dummies
from .types import Color
from .types import Colors
from .types import PortNames
@ -48,8 +49,8 @@ class _UnitInfo:
# =====
class StateCache: # pylint: disable=too-many-instance-attributes
__FW_VERSION = 7
class StateCache: # pylint: disable=too-many-instance-attributes,too-many-public-methods
__FW_VERSION = 8
__FULL = 0xFFFF
__SUMMARY = 0x01
@ -62,6 +63,7 @@ class StateCache: # pylint: disable=too-many-instance-attributes
def __init__(self) -> None:
self.__edids = Edids()
self.__dummies = Dummies({})
self.__colors = Colors()
self.__port_names = PortNames({})
self.__atx_cp_delays = AtxClickPowerDelays({})
@ -77,6 +79,9 @@ class StateCache: # pylint: disable=too-many-instance-attributes
def get_edids(self) -> Edids:
return self.__edids.copy()
def get_dummies(self) -> Dummies:
return self.__dummies.copy()
def get_colors(self) -> Colors:
return self.__colors
@ -226,6 +231,9 @@ class StateCache: # pylint: disable=too-many-instance-attributes
"reset": self.__atx_cr_delays[port],
},
},
"video": {
"dummy": self.__dummies[port],
},
})
if x_edids:
state["edids"]["used"].append(self.__edids.get_id_for_port(port))
@ -327,6 +335,12 @@ class StateCache: # pylint: disable=too-many-instance-attributes
if changed:
self.__bump_state(self.__EDIDS)
def set_dummies(self, dummies: Dummies) -> None:
changed = (not self.__dummies.compare_on_ports(dummies, self.__get_ports()))
self.__dummies = dummies.copy()
if changed:
self.__bump_state(self.__FULL)
def set_colors(self, colors: Colors) -> None:
changed = (self.__colors != colors)
self.__colors = colors

View File

@ -39,6 +39,7 @@ from .lib import get_logger
from .types import Edid
from .types import Edids
from .types import Dummies
from .types import Color
from .types import Colors
from .types import PortNames
@ -52,6 +53,8 @@ class StorageContext:
__F_EDIDS_ALL = "edids_all.json"
__F_EDIDS_PORT = "edids_port.json"
__F_DUMMIES = "dummies.json"
__F_COLORS = "colors.json"
__F_PORT_NAMES = "port_names.json"
@ -74,6 +77,9 @@ class StorageContext:
})
await self.__write_json_keyvals(self.__F_EDIDS_PORT, edids.port)
async def write_dummies(self, dummies: Dummies) -> None:
await self.__write_json_keyvals(self.__F_DUMMIES, dummies.kvs)
async def write_colors(self, colors: Colors) -> None:
await self.__write_json_keyvals(self.__F_COLORS, {
role: {
@ -116,6 +122,10 @@ class StorageContext:
port_edids = await self.__read_json_keyvals_int(self.__F_EDIDS_PORT)
return Edids(all_edids, port_edids)
async def read_dummies(self) -> Dummies:
kvs = await self.__read_json_keyvals_int(self.__F_DUMMIES)
return Dummies({key: bool(value) for (key, value) in kvs.items()})
async def read_colors(self) -> Colors:
raw = await self.__read_json_keyvals(self.__F_COLORS)
return Colors(**{ # type: ignore

View File

@ -281,6 +281,19 @@ class _PortsDict(Generic[_T]):
else:
self.kvs[port] = value
def __eq__(self, other: "_PortsDict[_T]") -> bool:
if not isinstance(other, self.__class__):
return False
return (self.kvs == other.kvs)
class Dummies(_PortsDict[bool]):
def __init__(self, kvs: dict[int, bool]) -> None:
super().__init__(True, kvs)
def copy(self) -> "Dummies":
return Dummies(self.kvs)
class PortNames(_PortsDict[str]):
def __init__(self, kvs: dict[int, str]) -> None:

Binary file not shown.

View File

@ -459,7 +459,7 @@ export function Switch() {
let create_content = function(el_parent) {
let html = `
<table>
<table style="width: 100%">
<tr>
<td>Port name:</td>
<td><input
@ -473,9 +473,33 @@ export function Switch() {
<td><select id="__switch-port-edid-selector" style="width: 100%"></select></td>
</tr>
</table>
<hr>
<table>
`;
let fw = model.units[model.ports[port].unit].firmware;
if (fw.devbuild || fw.version >= 8) {
html += `
<hr>
<table style="width: 100%">
<tr>
<td>Simulate display on inactive port:</td>
<td align="right">
<div class="switch-box">
<input
type="checkbox" id="__switch-port-dummy-switch"
${model.ports[port].video.dummy ? 'checked' : ''}
/>
<label for="__switch-port-dummy-switch">
<span class="switch-inner"></span>
<span class="switch"></span>
</label>
</div>
</td>
</tr>
</table>
`;
}
html += "<hr><table style=\"width: 100%\">";
for (let kv of Object.entries(atx_actions)) {
html += `
<tr>
@ -491,6 +515,7 @@ export function Switch() {
`;
}
html += "</table>";
el_parent.innerHTML = html;
let el_selector = $("__switch-port-edid-selector");
@ -521,6 +546,7 @@ export function Switch() {
let params = {
"port": port,
"edid_id": $("__switch-port-edid-selector").value,
"dummy": $("__switch-port-dummy-switch").checked,
"name": $("__switch-port-name-input").value,
};
for (let action of Object.keys(atx_actions)) {