mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
typing fixes
This commit is contained in:
@@ -23,7 +23,6 @@
|
||||
import os
|
||||
import select
|
||||
import multiprocessing
|
||||
import multiprocessing.queues
|
||||
import queue
|
||||
import errno
|
||||
import time
|
||||
@@ -76,7 +75,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
|
||||
self.__noop = noop
|
||||
|
||||
self.__fd = -1
|
||||
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
||||
self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue()
|
||||
self.__state_flags = aiomulti.AioSharedFlags({"online": True, **initial_state}, notifier)
|
||||
self.__stop_event = multiprocessing.Event()
|
||||
|
||||
@@ -93,7 +92,7 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
|
||||
if self.__ensure_device(): # Check device and process reports if needed
|
||||
self.__read_all_reports()
|
||||
try:
|
||||
event: BaseEvent = self.__events_queue.get(timeout=0.1)
|
||||
event = self.__events_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
if not self.__udc.can_operate():
|
||||
self.__close_device()
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
|
||||
import os
|
||||
import multiprocessing
|
||||
import multiprocessing.queues
|
||||
import dataclasses
|
||||
import queue
|
||||
import struct
|
||||
@@ -225,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
|
||||
|
||||
self.__gpio = _Gpio(reset_pin, reset_delay)
|
||||
|
||||
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
||||
self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
|
||||
|
||||
self.__notifier = aiomulti.AioProcessNotifier()
|
||||
self.__state_flags = aiomulti.AioSharedFlags({
|
||||
@@ -344,7 +343,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
|
||||
with self.__get_serial() as tty:
|
||||
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
|
||||
try:
|
||||
event: _BaseEvent = self.__events_queue.get(timeout=0.1)
|
||||
event = self.__events_queue.get(timeout=0.1)
|
||||
except queue.Empty:
|
||||
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user