libc module

This commit is contained in:
Maxim Devaev 2021-12-30 05:44:42 +03:00
parent 885c14f9e4
commit f609e857b1
2 changed files with 63 additions and 30 deletions

View File

@ -26,16 +26,11 @@ import sys
import os import os
import asyncio import asyncio
import ctypes import ctypes
import ctypes.util
import struct import struct
import dataclasses import dataclasses
import types import types
import errno import errno
from ctypes import c_int
from ctypes import c_uint32
from ctypes import c_char_p
from typing import Tuple from typing import Tuple
from typing import List from typing import List
from typing import Dict from typing import Dict
@ -45,28 +40,7 @@ from typing import Optional
from .logging import get_logger from .logging import get_logger
from . import libc
# =====
def _load_libc() -> ctypes.CDLL:
path = ctypes.util.find_library("c")
if not path:
raise RuntimeError("Where is libc?")
assert path
lib = ctypes.CDLL(path)
for (name, restype, argtypes) in [
("inotify_init", c_int, []),
("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]),
("inotify_rm_watch", c_int, [c_int, c_uint32]),
]:
func = getattr(lib, name)
if not func:
raise RuntimeError(f"Where is libc.{name}?")
setattr(func, "restype", restype)
setattr(func, "argtypes", argtypes)
return lib
_libc = _load_libc()
# ===== # =====
@ -225,7 +199,7 @@ class Inotify:
path = os.path.normpath(path) path = os.path.normpath(path)
assert path not in self.__wd_by_path, path assert path not in self.__wd_by_path, path
get_logger().info("Watching for %s", path) get_logger().info("Watching for %s", path)
wd = _inotify_check(_libc.inotify_add_watch(self.__fd, _fs_encode(path), mask)) wd = _inotify_check(libc.inotify_add_watch(self.__fd, _fs_encode(path), mask))
self.__wd_by_path[path] = wd self.__wd_by_path[path] = wd
self.__path_by_wd[wd] = path self.__path_by_wd[wd] = path
@ -303,7 +277,7 @@ class Inotify:
def __enter__(self) -> "Inotify": def __enter__(self) -> "Inotify":
assert self.__fd < 0 assert self.__fd < 0
self.__fd = _inotify_check(_libc.inotify_init()) self.__fd = _inotify_check(libc.inotify_init())
asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events) asyncio.get_event_loop().add_reader(self.__fd, self.__read_and_queue_events)
return self return self
@ -317,7 +291,7 @@ class Inotify:
if self.__fd >= 0: if self.__fd >= 0:
asyncio.get_event_loop().remove_reader(self.__fd) asyncio.get_event_loop().remove_reader(self.__fd)
for wd in list(self.__wd_by_path.values()): for wd in list(self.__wd_by_path.values()):
_libc.inotify_rm_watch(self.__fd, wd) libc.inotify_rm_watch(self.__fd, wd)
try: try:
os.close(self.__fd) os.close(self.__fd)
except Exception: except Exception:

59
kvmd/libc.py Normal file
View File

@ -0,0 +1,59 @@
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This source file is partially based on python-watchdog module. #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import ctypes
import ctypes.util
from ctypes import c_int
from ctypes import c_uint32
from ctypes import c_char_p
# =====
def _load_libc() -> ctypes.CDLL:
path = ctypes.util.find_library("c")
if not path:
raise RuntimeError("Where is libc?")
assert path
lib = ctypes.CDLL(path)
for (name, restype, argtypes) in [
("inotify_init", c_int, []),
("inotify_add_watch", c_int, [c_int, c_char_p, c_uint32]),
("inotify_rm_watch", c_int, [c_int, c_uint32]),
]:
func = getattr(lib, name)
if not func:
raise RuntimeError(f"Where is libc.{name}?")
setattr(func, "restype", restype)
setattr(func, "argtypes", argtypes)
return lib
_libc = _load_libc()
# =====
inotify_init = _libc.inotify_init
inotify_add_watch = _libc.inotify_add_watch
inotify_rm_watch = _libc.inotify_rm_watch