global event loop

This commit is contained in:
Devaev Maxim 2019-04-09 08:04:36 +03:00
parent a6028c46a4
commit 60849efa72
7 changed files with 41 additions and 63 deletions

View File

@ -20,8 +20,6 @@
# ========================================================================== #
import asyncio
from typing import List
from typing import Optional
@ -46,7 +44,6 @@ def main(argv: Optional[List[str]]=None) -> None:
config = init("kvmd", description="The main Pi-KVM daemon", argv=argv)[2].kvmd
with gpio.bcm():
# pylint: disable=protected-access
loop = asyncio.get_event_loop()
Server(
auth_manager=AuthManager(
internal_users=config.auth.internal_users,
@ -55,14 +52,12 @@ def main(argv: Optional[List[str]]=None) -> None:
internal=config.auth.internal._unpack(),
external=(config.auth.external._unpack() if config.auth.external_type else {}),
),
info_manager=InfoManager(loop=loop, **config.info._unpack()),
log_reader=LogReader(loop=loop),
info_manager=InfoManager(**config.info._unpack()),
log_reader=LogReader(),
hid=Hid(**config.hid._unpack()),
atx=Atx(**config.atx._unpack()),
msd=MassStorageDevice(loop=loop, **config.msd._unpack()),
streamer=Streamer(loop=loop, **config.streamer._unpack()),
loop=loop,
msd=MassStorageDevice(**config.msd._unpack()),
streamer=Streamer(**config.streamer._unpack()),
).run(**config.server._unpack())
get_logger().info("Bye-bye")

View File

@ -37,19 +37,16 @@ class InfoManager:
self,
meta_path: str,
extras_path: str,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__meta_path = meta_path
self.__extras_path = extras_path
self.__loop = loop
async def get_meta(self) -> Dict:
return (await self.__loop.run_in_executor(None, load_yaml_file, self.__meta_path))
return (await asyncio.get_running_loop().run_in_executor(None, load_yaml_file, self.__meta_path))
async def get_extras(self) -> Dict:
return (await self.__loop.run_in_executor(None, self.__sync_get_extras))
return (await asyncio.get_running_loop().run_in_executor(None, self.__sync_get_extras))
def __sync_get_extras(self) -> Dict:
try:

View File

@ -32,9 +32,6 @@ import systemd.journal
# =====
class LogReader:
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self.__loop = loop
async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[Dict, None]:
reader = systemd.journal.Reader()
reader.this_boot()

View File

@ -206,8 +206,6 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
reset_delay: float,
write_meta: bool,
chunk_size: int,
loop: asyncio.AbstractEventLoop,
) -> None:
self._enabled = enabled
@ -226,8 +224,6 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
self.__write_meta = write_meta
self.chunk_size = chunk_size
self.__loop = loop
self.__device_info: Optional[_MassStorageDeviceInfo] = None
self.__saved_device_info: Optional[_MassStorageDeviceInfo] = None
self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError)
@ -241,7 +237,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
logger.info("Using %r as mass-storage device", self._device_path)
try:
logger.info("Enabled image metadata writing")
loop.run_until_complete(self.connect_to_kvm(no_delay=True))
asyncio.get_event_loop().run_until_complete(self.connect_to_kvm(no_delay=True))
except Exception as err:
if isinstance(err, MsdError):
log = logger.error
@ -366,10 +362,10 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
assert self.__device_file
await self.__device_file.write(data)
await self.__device_file.flush()
await self.__loop.run_in_executor(None, os.fsync, self.__device_file.fileno())
await asyncio.get_running_loop().run_in_executor(None, os.fsync, self.__device_file.fileno())
async def __load_device_info(self) -> None:
device_info = await self.__loop.run_in_executor(None, _explore_device, self._device_path)
device_info = await asyncio.get_running_loop().run_in_executor(None, _explore_device, self._device_path)
if not device_info:
raise MsdError("Can't explore device %r" % (self._device_path))
self.__device_info = self.__saved_device_info = device_info

View File

@ -217,8 +217,6 @@ class Server: # pylint: disable=too-many-instance-attributes
atx: Atx,
msd: MassStorageDevice,
streamer: Streamer,
loop: asyncio.AbstractEventLoop,
) -> None:
self._auth_manager = auth_manager
@ -230,8 +228,6 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__msd = msd
self.__streamer = streamer
self.__loop = loop
self.__heartbeat: Optional[float] = None # Assigned in run() for consistance
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
@ -257,23 +253,6 @@ class Server: # pylint: disable=too-many-instance-attributes
setproctitle.setproctitle("[main] " + setproctitle.getproctitle())
self.__heartbeat = heartbeat
app = aiohttp.web.Application(loop=self.__loop)
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
for name in dir(self):
method = getattr(self, name)
if inspect.ismethod(method):
if getattr(method, _ATTR_SYSTEM_TASK, False):
self.__system_tasks.append(self.__loop.create_task(method()))
elif getattr(method, _ATTR_EXPOSED, False):
# router = app.router
router = getattr(app, "router") # FIXME: Dirty hack to avoid pylint crash
router.add_route(
getattr(method, _ATTR_EXPOSED_METHOD),
getattr(method, _ATTR_EXPOSED_PATH),
method,
)
assert port or unix_path
if unix_path:
@ -289,7 +268,7 @@ class Server: # pylint: disable=too-many-instance-attributes
socket_kwargs = {"host": host, "port": port}
aiohttp.web.run_app(
app=app,
app=self.__make_app(),
access_log_format=access_log_format,
print=self.__run_app_print,
**socket_kwargs,
@ -513,7 +492,27 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__reset_streamer = True
return _json()
# =====
# ===== SYSTEM STUFF
async def __make_app(self) -> aiohttp.web.Application:
app = aiohttp.web.Application()
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
for name in dir(self):
method = getattr(self, name)
if inspect.ismethod(method):
if getattr(method, _ATTR_SYSTEM_TASK, False):
self.__system_tasks.append(asyncio.create_task(method()))
elif getattr(method, _ATTR_EXPOSED, False):
# router = app.router
router = getattr(app, "router") # FIXME: Dirty hack to avoid pylint crash
router.add_route(
getattr(method, _ATTR_EXPOSED_METHOD),
getattr(method, _ATTR_EXPOSED_PATH),
method,
)
return app
def __run_app_print(self, text: str) -> None:
logger = get_logger()

View File

@ -61,8 +61,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
timeout: float,
cmd: List[str],
loop: asyncio.AbstractEventLoop,
) -> None:
self.__cap_pin = (gpio.set_output(cap_pin) if cap_pin >= 0 else -1)
@ -87,8 +85,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__cmd = cmd
self.__loop = loop
self.__streamer_task: Optional[asyncio.Task] = None
self.__http_session: Optional[aiohttp.ClientSession] = None
@ -174,7 +170,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __inner_start(self) -> None:
assert not self.__streamer_task
await self.__set_hw_enabled(True)
self.__streamer_task = self.__loop.create_task(self.__run_streamer())
self.__streamer_task = asyncio.get_running_loop().create_task(self.__run_streamer())
async def __inner_stop(self) -> None:
assert self.__streamer_task

View File

@ -30,8 +30,7 @@ from kvmd.aioregion import AioExclusiveRegion
# =====
@pytest.mark.asyncio
async def test_ok__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
_ = event_loop
async def test_ok__access_one() -> None:
region = AioExclusiveRegion(RegionIsBusyError)
async def func() -> None:
@ -48,8 +47,7 @@ async def test_ok__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
@pytest.mark.asyncio
async def test_fail__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
_ = event_loop
async def test_fail__access_one() -> None:
region = AioExclusiveRegion(RegionIsBusyError)
async def func() -> None:
@ -69,19 +67,19 @@ async def test_fail__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
# =====
@pytest.mark.asyncio
async def test_ok__access_two(event_loop: asyncio.AbstractEventLoop) -> None:
async def test_ok__access_two() -> None:
region = AioExclusiveRegion(RegionIsBusyError)
async def func1() -> None:
with region:
await asyncio.sleep(1, loop=event_loop)
await asyncio.sleep(1)
print("done func1()")
async def func2() -> None:
await asyncio.sleep(2)
print("waiking up func2()")
with region:
await asyncio.sleep(1, loop=event_loop)
await asyncio.sleep(1)
print("done func2()")
await asyncio.gather(func1(), func2())
@ -92,21 +90,21 @@ async def test_ok__access_two(event_loop: asyncio.AbstractEventLoop) -> None:
@pytest.mark.asyncio
async def test_fail__access_two(event_loop: asyncio.AbstractEventLoop) -> None:
async def test_fail__access_two() -> None:
region = AioExclusiveRegion(RegionIsBusyError)
async def func1() -> None:
with region:
await asyncio.sleep(2, loop=event_loop)
await asyncio.sleep(2)
print("done func1()")
async def func2() -> None:
await asyncio.sleep(1)
with region:
await asyncio.sleep(1, loop=event_loop)
await asyncio.sleep(1)
print("done func2()")
results = await asyncio.gather(func1(), func2(), loop=event_loop, return_exceptions=True)
results = await asyncio.gather(func1(), func2(), return_exceptions=True)
assert results[0] is None
assert type(results[1]) == RegionIsBusyError # pylint: disable=unidiomatic-typecheck