mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 09:01:54 +08:00
ocr
This commit is contained in:
@@ -454,7 +454,8 @@ def _get_config_scheme() -> Dict:
|
||||
},
|
||||
|
||||
"ocr": {
|
||||
"langs": Option(["eng"], type=valid_string_list, unpack_as="default_langs"),
|
||||
"langs": Option(["eng"], type=valid_string_list, unpack_as="default_langs"),
|
||||
"tessdata": Option("/usr/share/tessdata", type=valid_stripped_string_not_empty, unpack_as="data_dir_path")
|
||||
},
|
||||
|
||||
"snapshot": {
|
||||
|
||||
@@ -63,7 +63,7 @@ class StreamerApi:
|
||||
)
|
||||
if snapshot:
|
||||
if valid_bool(request.query.get("ocr", "false")):
|
||||
langs = await self.__ocr.get_available_langs()
|
||||
langs = self.__ocr.get_available_langs()
|
||||
return Response(
|
||||
body=(await self.__ocr.recognize(
|
||||
data=snapshot.data,
|
||||
@@ -107,8 +107,8 @@ class StreamerApi:
|
||||
default: List[str] = []
|
||||
available: List[str] = []
|
||||
if enabled:
|
||||
default = await self.__ocr.get_default_langs()
|
||||
available = await self.__ocr.get_available_langs()
|
||||
default = self.__ocr.get_default_langs()
|
||||
available = self.__ocr.get_available_langs()
|
||||
return {
|
||||
"ocr": {
|
||||
"enabled": enabled,
|
||||
|
||||
@@ -32,7 +32,6 @@ from typing import List
|
||||
from typing import Dict
|
||||
from typing import Set
|
||||
from typing import Callable
|
||||
from typing import Awaitable
|
||||
from typing import Coroutine
|
||||
from typing import AsyncGenerator
|
||||
from typing import Optional
|
||||
@@ -264,16 +263,27 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
|
||||
await self.__register_ws_client(client)
|
||||
|
||||
try:
|
||||
await self.__send_events_aws(client.ws, [
|
||||
stage1 = [
|
||||
("gpio_model_state", self.__user_gpio.get_model()),
|
||||
("hid_keymaps_state", self.__hid_api.get_keymaps()),
|
||||
("streamer_ocr_state", self.__streamer_api.get_ocr()),
|
||||
])
|
||||
await self.__send_events_aws(client.ws, [
|
||||
]
|
||||
stage2 = [
|
||||
(comp.event_type, comp.get_state())
|
||||
for comp in self.__components
|
||||
if comp.get_state
|
||||
])
|
||||
]
|
||||
stages = stage1 + stage2
|
||||
events = dict(zip(
|
||||
map(operator.itemgetter(0), stages),
|
||||
await asyncio.gather(*map(operator.itemgetter(1), stages)),
|
||||
))
|
||||
for stage in [stage1, stage2]:
|
||||
await asyncio.gather(*[
|
||||
self.__send_event(client.ws, event_type, events.pop(event_type))
|
||||
for (event_type, _) in stage
|
||||
])
|
||||
|
||||
await self.__send_event(client.ws, "loop", {})
|
||||
|
||||
async for msg in client.ws:
|
||||
@@ -391,15 +401,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
|
||||
logger.exception("Cleanup error on %s", comp.name)
|
||||
logger.info("On-Cleanup complete")
|
||||
|
||||
async def __send_events_aws(self, ws: aiohttp.web.WebSocketResponse, sources: List[Tuple[str, Awaitable]]) -> None:
|
||||
await asyncio.gather(*[
|
||||
self.__send_event(ws, event_type, state)
|
||||
for (event_type, state) in zip(
|
||||
map(operator.itemgetter(0), sources),
|
||||
await asyncio.gather(*map(operator.itemgetter(1), sources)),
|
||||
)
|
||||
])
|
||||
|
||||
async def __send_event(self, ws: aiohttp.web.WebSocketResponse, event_type: str, event: Optional[Dict]) -> None:
|
||||
await ws.send_str(json.dumps({
|
||||
"event_type": event_type,
|
||||
|
||||
@@ -20,6 +20,8 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import os
|
||||
import stat
|
||||
import io
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
@@ -69,7 +71,6 @@ def _load_libtesseract() -> Optional[ctypes.CDLL]:
|
||||
("TessBaseAPISetImage", None, [POINTER(_TessBaseAPI), c_void_p, c_int, c_int, c_int, c_int]),
|
||||
("TessBaseAPIGetUTF8Text", POINTER(c_char), [POINTER(_TessBaseAPI)]),
|
||||
("TessBaseAPISetVariable", c_bool, [POINTER(_TessBaseAPI), c_char_p, c_char_p]),
|
||||
("TessBaseAPIGetAvailableLanguagesAsVector", POINTER(POINTER(c_char)), [POINTER(_TessBaseAPI)]),
|
||||
]:
|
||||
func = getattr(lib, name)
|
||||
if not func:
|
||||
@@ -86,12 +87,12 @@ _libtess = _load_libtesseract()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _tess_api(langs: List[str]) -> Generator[_TessBaseAPI, None, None]:
|
||||
def _tess_api(data_dir_path: str, langs: List[str]) -> Generator[_TessBaseAPI, None, None]:
|
||||
if not _libtess:
|
||||
raise OcrError("Tesseract is not available")
|
||||
api = _libtess.TessBaseAPICreate()
|
||||
try:
|
||||
if _libtess.TessBaseAPIInit3(api, None, "+".join(langs).encode()) != 0:
|
||||
if _libtess.TessBaseAPIInit3(api, data_dir_path.encode(), "+".join(langs).encode()) != 0:
|
||||
raise OcrError("Can't initialize Tesseract")
|
||||
if not _libtess.TessBaseAPISetVariable(api, b"debug_file", b"/dev/null"):
|
||||
raise OcrError("Can't set debug_file=/dev/null")
|
||||
@@ -100,35 +101,32 @@ def _tess_api(langs: List[str]) -> Generator[_TessBaseAPI, None, None]:
|
||||
_libtess.TessBaseAPIDelete(api)
|
||||
|
||||
|
||||
_LANG_SUFFIX = ".traineddata"
|
||||
|
||||
|
||||
# =====
|
||||
class TesseractOcr:
|
||||
def __init__(self, default_langs: List[str]) -> None:
|
||||
def __init__(self, data_dir_path: str, default_langs: List[str]) -> None:
|
||||
self.__data_dir_path = data_dir_path
|
||||
self.__default_langs = default_langs
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return bool(_libtess)
|
||||
|
||||
async def get_default_langs(self) -> List[str]:
|
||||
def get_default_langs(self) -> List[str]:
|
||||
return list(self.__default_langs)
|
||||
|
||||
async def get_available_langs(self) -> List[str]:
|
||||
return (await aiotools.run_async(self.__inner_get_available_langs))
|
||||
|
||||
def __inner_get_available_langs(self) -> List[str]:
|
||||
with _tess_api(["osd"]) as api:
|
||||
assert _libtess
|
||||
langs: Set[str] = set()
|
||||
langs_ptr = _libtess.TessBaseAPIGetAvailableLanguagesAsVector(api)
|
||||
if langs_ptr is not None:
|
||||
index = 0
|
||||
while langs_ptr[index]:
|
||||
lang = ctypes.cast(langs_ptr[index], c_char_p).value
|
||||
if lang is not None:
|
||||
langs.add(lang.decode())
|
||||
libc.free(langs_ptr[index])
|
||||
index += 1
|
||||
libc.free(langs_ptr)
|
||||
return sorted(langs)
|
||||
def get_available_langs(self) -> List[str]:
|
||||
# Это быстрее чем, инициализация либы и TessBaseAPIGetAvailableLanguagesAsVector()
|
||||
langs: Set[str] = set()
|
||||
for lang_name in os.listdir(self.__data_dir_path):
|
||||
if lang_name.endswith(_LANG_SUFFIX):
|
||||
path = os.path.join(self.__data_dir_path, lang_name)
|
||||
if os.access(path, os.R_OK) and stat.S_ISREG(os.stat(path).st_mode):
|
||||
lang = lang_name[:-len(_LANG_SUFFIX)]
|
||||
if lang:
|
||||
langs.add(lang)
|
||||
return sorted(langs)
|
||||
|
||||
async def recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str:
|
||||
if not langs:
|
||||
@@ -136,7 +134,7 @@ class TesseractOcr:
|
||||
return (await aiotools.run_async(self.__inner_recognize, data, langs, left, top, right, bottom))
|
||||
|
||||
def __inner_recognize(self, data: bytes, langs: List[str], left: int, top: int, right: int, bottom: int) -> str:
|
||||
with _tess_api(langs) as api:
|
||||
with _tess_api(self.__data_dir_path, langs) as api:
|
||||
assert _libtess
|
||||
with io.BytesIO(data) as bio:
|
||||
image = PilImage.open(bio)
|
||||
|
||||
Reference in New Issue
Block a user