otg msd and big refactoring

This commit is contained in:
Devaev Maxim
2019-10-29 02:16:12 +03:00
parent f6214191af
commit 10f8c2b335
15 changed files with 1300 additions and 356 deletions

335
kvmd/inotify.py Normal file
View File

@@ -0,0 +1,335 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This source file is partially based on python-watchdog module. #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import sys
import os
import asyncio
import asyncio.queues
import ctypes
import ctypes.util
import struct
import dataclasses
import types
import errno
from ctypes import c_int
from ctypes import c_char_p
from ctypes import c_uint32
from typing import Tuple
from typing import List
from typing import Dict
from typing import Type
from typing import Generator
from typing import Optional
from .logging import get_logger
# =====
def _load_libc() -> ctypes.CDLL:
try:
path = ctypes.util.find_library("c")
except (OSError, IOError, RuntimeError):
pass
else:
if path:
return ctypes.CDLL(path)
names = ["libc.so", "libc.so.6", "libc.so.0"]
for (index, name) in enumerate(names):
try:
return ctypes.CDLL(name)
except (OSError, IOError):
if index == len(names) - 1:
raise
raise RuntimeError("Where is libc?")
_libc = _load_libc()
def _get_libc_func(name: str, restype, argtypes=None): # type: ignore
return ctypes.CFUNCTYPE(restype, *(argtypes or []), use_errno=True)((name, _libc))
_inotify_init = _get_libc_func("inotify_init", c_int)
_inotify_add_watch = _get_libc_func("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32])
_inotify_rm_watch = _get_libc_func("inotify_rm_watch", c_int, [c_int, c_uint32])
# =====
_EVENT_HEAD_FMT = "iIII"
_EVENT_HEAD_SIZE = struct.calcsize(_EVENT_HEAD_FMT)
_EVENTS_BUFFER_LENGTH = 4096 * (_EVENT_HEAD_SIZE + 256) # count * (head + max_file_name_size + null_character)
_FS_FALLBACK_ENCODING = "utf-8"
_FS_ENCODING = (sys.getfilesystemencoding() or _FS_FALLBACK_ENCODING)
# =====
def _inotify_parsed_buffer(data: bytes) -> Generator[Tuple[int, int, int, bytes], None, None]:
offset = 0
while offset + _EVENT_HEAD_SIZE <= len(data):
(wd, mask, cookie, length) = struct.unpack_from("iIII", data, offset)
name = data[
offset + _EVENT_HEAD_SIZE # noqa: E203
:
offset + _EVENT_HEAD_SIZE + length
].rstrip(b"\0")
offset += _EVENT_HEAD_SIZE + length
if wd >= 0:
yield (wd, mask, cookie, name)
def _inotify_check(retval: int) -> int:
if retval < 0:
c_errno = ctypes.get_errno()
if c_errno == errno.ENOSPC: # pylint: disable=no-else-raise
raise OSError(c_errno, "Inotify watch limit reached")
elif c_errno == errno.EMFILE:
raise OSError(c_errno, "Inotify instance limit reached")
else:
raise OSError(c_errno, os.strerror(c_errno))
return retval
def _fs_encode(path: str) -> bytes:
try:
return path.encode(_FS_ENCODING, "strict")
except UnicodeEncodeError:
return path.encode(_FS_FALLBACK_ENCODING, "strict")
def _fs_decode(path: bytes) -> str:
try:
return path.decode(_FS_ENCODING, "strict")
except UnicodeDecodeError:
return path.decode(_FS_FALLBACK_ENCODING, "strict")
# =====
class InotifyMask:
# Userspace events
ACCESS = 0x00000001 # File was accessed
ATTRIB = 0x00000004 # Meta-data changed
CLOSE_WRITE = 0x00000008 # Writable file was closed
CLOSE_NOWRITE = 0x00000010 # Unwritable file closed
CREATE = 0x00000100 # Subfile was created
DELETE = 0x00000200 # Subfile was deleted
DELETE_SELF = 0x00000400 # Self was deleted
MODIFY = 0x00000002 # File was modified
MOVE_SELF = 0x00000800 # Self was moved
MOVED_FROM = 0x00000040 # File was moved from X
MOVED_TO = 0x00000080 # File was moved to Y
OPEN = 0x00000020 # File was opened
# Events sent by the kernel to a watch
IGNORED = 0x00008000 # File was ignored
ISDIR = 0x40000000 # Event occurred against directory
Q_OVERFLOW = 0x00004000 # Event queued overflowed
UNMOUNT = 0x00002000 # Backing file system was unmounted
# Helper userspace events
# CLOSE = CLOSE_WRITE | CLOSE_NOWRITE # Close
# MOVE = MOVED_FROM | MOVED_TO # Moves
# Helper for userspace events
# ALL_EVENTS = (
# ACCESS
# | ATTRIB
# | CLOSE_WRITE
# | CLOSE_NOWRITE
# | CREATE
# | DELETE
# | DELETE_SELF
# | MODIFY
# | MOVE_SELF
# | MOVED_FROM
# | MOVED_TO
# | OPEN
# )
# Helper for all modify events
ALL_MODIFY_EVENTS = (
CLOSE_WRITE
| CREATE
| DELETE
| DELETE_SELF
| MODIFY
| MOVE_SELF
| MOVED_FROM
| MOVED_TO
)
# Special flags for watch()
# DONT_FOLLOW = 0x02000000 # Don't follow a symbolic link
# EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
# MASK_CREATE = 0x10000000 # Don't overwrite existent watchers (since 4.18)
# MASK_ADD = 0x20000000 # Add to the mask of an existing watch
# ONESHOT = 0x80000000 # Only send event once
# ONLYDIR = 0x01000000 # Only watch the path if it's a directory
@classmethod
def to_string(cls, mask: int) -> str:
flags: List[str] = []
for name in dir(cls):
if (
name[0].isupper()
and not name.startswith("ALL_")
and name not in ["CLOSE", "MOVE"]
and mask & getattr(cls, name)
):
flags.append(name)
return "|".join(flags)
@dataclasses.dataclass(frozen=True, repr=False)
class InotifyEvent:
wd: int
mask: int
cookie: int
name: str
path: str
def __repr__(self) -> str:
return (
f"<InotifyEvent: wd={self.wd}, mask={InotifyMask.to_string(self.mask)},"
f" cookie={self.cookie}, name={self.name}, path={self.path}>"
)
class Inotify:
def __init__(self) -> None:
self.__fd = -1
self.__wd_by_path: Dict[str, int] = {}
self.__path_by_wd: Dict[int, str] = {}
self.__moved: Dict[int, str] = {}
self.__events_queue: asyncio.queues.Queue = asyncio.Queue()
def watch(self, path: str, mask: int) -> None:
path = os.path.normpath(path)
assert path not in self.__wd_by_path, path
get_logger().info("Watching for %s: %s", path, InotifyMask.to_string(mask))
wd = _inotify_check(_inotify_add_watch(self.__fd, _fs_encode(path), mask))
self.__wd_by_path[path] = wd
self.__path_by_wd[wd] = path
# def unwatch(self, path: str) -> None:
# path = os.path.normpath(path)
# assert path in self.__wd_by_path, path
# get_logger().info("Unwatching %s", path)
# wd = self.__wd_by_path[path]
# _inotify_check(_inotify_rm_watch(self.__fd, wd))
# del self.__wd_by_path[path]
# del self.__path_by_wd[wd]
# def has_events(self) -> bool:
# return (not self.__events_queue.empty())
async def get_event(self, timeout: float) -> Optional[InotifyEvent]:
assert timeout > 0
try:
return (await asyncio.wait_for(self.__events_queue.get(), timeout=timeout))
except asyncio.TimeoutError:
return None
async def get_series(self, timeout: float) -> List[InotifyEvent]:
series: List[InotifyEvent] = []
event = await self.get_event(timeout)
if event:
series.append(event)
while event:
event = await self.get_event(timeout)
if event:
series.append(event)
return series
def __read_and_queue_events(self) -> None:
logger = get_logger()
for event in self.__read_parsed_events():
# XXX: Ни в коем случае не приводить self.__read_parsed_events() к списку.
# Он использует self.__wd_by_path и self.__path_by_wd, содержимое которых
# корректируется кодом ниже. В противном случае все сломается.
if event.mask & InotifyMask.MOVED_FROM:
self.__moved[event.cookie] = event.path # Save moved_from_path
elif event.mask & InotifyMask.MOVED_TO:
moved_from_path = self.__moved.pop(event.cookie, None)
if moved_from_path is not None:
wd = self.__wd_by_path.pop(moved_from_path, None)
if wd is not None:
self.__wd_by_path[event.path] = wd
self.__path_by_wd[wd] = event.path
if event.mask & InotifyMask.IGNORED:
ignored_path = self.__path_by_wd[event.wd]
if self.__wd_by_path[ignored_path] == event.wd:
logger.info("Unwatching %s because IGNORED was received", ignored_path)
del self.__wd_by_path[ignored_path]
continue
self.__events_queue.put_nowait(event)
def __read_parsed_events(self) -> Generator[InotifyEvent, None, None]:
for (wd, mask, cookie, name_bytes) in _inotify_parsed_buffer(self.__read_buffer()):
wd_path = self.__path_by_wd.get(wd, None)
if wd_path is not None:
name = _fs_decode(name_bytes)
path = (os.path.join(wd_path, name) if name else wd_path) # Avoid trailing slash
yield InotifyEvent(wd, mask, cookie, name, path)
def __read_buffer(self) -> bytes:
while True:
try:
return os.read(self.__fd, _EVENTS_BUFFER_LENGTH)
except OSError as err:
if err.errno == errno.EINTR:
pass
def __enter__(self) -> "Inotify":
assert self.__fd < 0
self.__fd = _inotify_check(_inotify_init())
asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events)
return self
def __exit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
if self.__fd >= 0:
asyncio.get_event_loop().remove_reader(self.__fd)
for wd in list(self.__wd_by_path.values()):
_inotify_rm_watch(self.__fd, wd)
try:
os.close(self.__fd)
except Exception:
pass