periodic snapshots

This commit is contained in:
Devaev Maxim 2020-06-08 04:51:48 +03:00
parent 04c3763e69
commit 241c787e10
5 changed files with 173 additions and 7 deletions

View File

@ -74,6 +74,8 @@ from ..validators.net import valid_ssl_ciphers
from ..validators.kvm import valid_stream_quality
from ..validators.kvm import valid_stream_fps
from ..validators.kvm import valid_hid_key
from ..validators.kvm import valid_hid_mouse_move
from ..validators.hw import valid_gpio_pin_optional
from ..validators.hw import valid_otg_gadget
@ -264,6 +266,18 @@ def _get_config_scheme() -> Dict:
"cmd": Option(["/bin/true"], type=valid_command),
},
"snapshot": {
"idle_interval": Option(0.0, type=valid_float_f0),
"live_interval": Option(0.0, type=valid_float_f0),
"wakeup_key": Option("", type=(lambda arg: (valid_hid_key(arg) if arg else ""))),
"wakeup_move": Option(0, type=valid_hid_mouse_move),
"online_delay": Option(5.0, type=valid_float_f0),
"retries": Option(10, type=valid_int_f1),
"retries_delay": Option(3.0, type=valid_float_f01),
},
},
"otg": {

View File

@ -38,6 +38,7 @@ from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
from .streamer import Streamer
from .snapshoter import Snapshoter
from .server import KvmdServer
@ -63,6 +64,9 @@ def main(argv: Optional[List[str]]=None) -> None:
global_config = config
config = config.kvmd
hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
streamer = Streamer(**config.streamer._unpack())
KvmdServer(
auth_manager=AuthManager(
internal_type=config.auth.internal.type,
@ -76,10 +80,16 @@ def main(argv: Optional[List[str]]=None) -> None:
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])),
hid=hid,
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=Streamer(**config.streamer._unpack()),
streamer=streamer,
snapshoter=Snapshoter(
hid=hid,
streamer=streamer,
**config.snapshot._unpack(),
),
heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size,

View File

@ -60,8 +60,9 @@ from ... import aioproc
from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .streamer import Streamer
from .wol import WakeOnLan
from .streamer import Streamer
from .snapshoter import Snapshoter
from .http import HttpError
from .http import HttpExposed
@ -117,6 +118,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
atx: BaseAtx,
msd: BaseMsd,
streamer: Streamer,
snapshoter: Snapshoter,
heartbeat: float,
sync_chunk_size: int,
@ -127,6 +129,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__auth_manager = auth_manager
self.__hid = hid
self.__streamer = streamer
self.__snapshoter = snapshoter # Not a component: No state or cleanup
self.__heartbeat = heartbeat
@ -239,6 +242,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
for component in self.__components:
if component.poll_state:
self.__run_system_task(self.__poll_state, component.event_type, component.poll_state())
self.__run_system_task(self.__stream_snapshoter)
for api in self.__apis:
for http_exposed in get_exposed_http(api):
@ -336,7 +340,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __stream_controller(self) -> None:
prev = False
while True:
cur = bool(self.__sockets)
cur = (bool(self.__sockets) or self.__snapshoter.snapshoting())
if not prev and cur:
await self.__streamer.ensure_start(init_restart=True)
elif prev and not cur:
@ -358,3 +362,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __poll_state(self, event_type: str, poller: AsyncGenerator[Dict, None]) -> None:
async for state in poller:
await self.__broadcast_event(event_type, state)
async def __stream_snapshoter(self) -> None:
await self.__snapshoter.run(
is_live=(lambda: bool(self.__sockets)),
notifier=self.__streamer_notifier,
)

View File

@ -0,0 +1,130 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import time
from typing import Callable
from ...logging import get_logger
from ...plugins.hid import BaseHid
from ... import aiotools
from .streamer import Streamer
# =====
class Snapshoter: # pylint: disable=too-many-instance-attributes
def __init__(
self,
hid: BaseHid,
streamer: Streamer,
idle_interval: float,
live_interval: float,
wakeup_key: str,
wakeup_move: int,
online_delay: float,
retries: int,
retries_delay: float,
) -> None:
self.__hid = hid
self.__streamer = streamer
if idle_interval or live_interval:
self.__idle_interval = (idle_interval or live_interval)
self.__live_interval = (live_interval or idle_interval)
assert self.__idle_interval
assert self.__live_interval
else:
self.__idle_interval = self.__live_interval = 0.0
self.__wakeup_key = wakeup_key
self.__wakeup_move = wakeup_move
self.__online_delay = online_delay
self.__retries = retries
self.__retries_delay = retries_delay
self.__snapshoting = False
async def run(self, is_live: Callable[[], bool], notifier: aiotools.AioNotifier) -> None:
if self.__idle_interval or self.__live_interval:
get_logger(0).info("Running periodic stream snapshot: idle=%.2f; live=%.2f ...",
self.__idle_interval, self.__live_interval)
last_snapshot_ts = 0.0
while True:
live = is_live()
if last_snapshot_ts + (self.__live_interval if live else self.__idle_interval) < time.time():
await self.__make_snapshot(live, notifier)
last_snapshot_ts = time.time()
await asyncio.sleep(min(self.__idle_interval, self.__live_interval))
else:
await aiotools.wait_infinite()
def snapshoting(self) -> bool:
return self.__snapshoting
async def __make_snapshot(self, live: bool, notifier: aiotools.AioNotifier) -> None:
logger = get_logger(0)
logger.info("Time to make the new snapshot (%s)", ("live" if live else "idle"))
try:
self.__snapshoting = True
await notifier.notify()
if not live:
if self.__wakeup_key:
logger.info("Waking up using key %r ...", self.__wakeup_key)
self.__hid.send_key_events([
(self.__wakeup_key, True),
(self.__wakeup_key, False),
])
if self.__wakeup_move:
logger.info("Waking up using mouse move for %d units ...", self.__wakeup_move)
self.__hid.send_mouse_move_event(0, 0)
self.__hid.send_mouse_move_event(self.__wakeup_move, self.__wakeup_move)
if self.__online_delay:
logger.info("Waiting %.2f seconds for online ...", self.__online_delay)
await asyncio.sleep(self.__online_delay)
retries = self.__retries
while retries:
snapshot = await self.__streamer.make_snapshot(save=True, load=False, allow_offline=False)
if snapshot:
logger.info("New snapshot saved: %dx%d", snapshot.width, snapshot.height)
break
retries -= 1
await asyncio.sleep(self.__retries_delay)
else:
logger.error("Can't make snapshot after %d retries", self.__retries)
except Exception: # Этого вообще-то не должно случаться, апи внутри заэксцепчены, но мало ли
logger.exception("Unhandled exception while making snapshot")
finally:
self.__snapshoting = False
await notifier.notify()

View File

@ -249,6 +249,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if load:
return self.__snapshot
else:
logger = get_logger()
session = self.__ensure_http_session()
try:
async with session.get(self.__make_url("snapshot")) as response:
@ -277,10 +278,11 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__snapshot = snapshot
await self.__state_notifier.notify()
return snapshot
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
pass
logger.error("Stream is offline, no signal or so")
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as err:
logger.error("Can't make snapshot: %s: %s", type(err).__name__, err)
except Exception:
get_logger().exception("Invalid streamer response from /snapshot")
logger.exception("Invalid streamer response from /snapshot")
return None
def remove_snapshot(self) -> None: