oled: sensors class

This commit is contained in:
Maxim Devaev 2024-09-12 17:05:35 +03:00
parent 489601bb96
commit 445e2e04e2
2 changed files with 138 additions and 78 deletions

View File

@ -3,7 +3,7 @@
# # # #
# KVMD-OLED - A small OLED daemon for PiKVM. # # KVMD-OLED - A small OLED daemon for PiKVM. #
# # # #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> # # Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# # # #
# This program is free software: you can redistribute it and/or modify # # This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by # # it under the terms of the GNU General Public License as published by #
@ -23,15 +23,11 @@
import sys import sys
import os import os
import socket
import signal import signal
import itertools import itertools
import logging import logging
import datetime
import time import time
import netifaces
import psutil
import usb.core import usb.core
from luma.core import cmdline as luma_cmdline from luma.core import cmdline as luma_cmdline
@ -41,76 +37,13 @@ from luma.core.render import canvas as luma_canvas
from PIL import Image from PIL import Image
from PIL import ImageFont from PIL import ImageFont
from .sensors import Sensors
# ===== # =====
_logger = logging.getLogger("oled") _logger = logging.getLogger("oled")
# =====
def _get_ip() -> tuple[str, str]:
try:
gws = netifaces.gateways()
if "default" in gws:
for proto in [socket.AF_INET, socket.AF_INET6]:
if proto in gws["default"]:
iface = gws["default"][proto][1]
addrs = netifaces.ifaddresses(iface)
return (iface, addrs[proto][0]["addr"])
for iface in netifaces.interfaces():
if not iface.startswith(("lo", "docker")):
addrs = netifaces.ifaddresses(iface)
for proto in [socket.AF_INET, socket.AF_INET6]:
if proto in addrs:
return (iface, addrs[proto][0]["addr"])
except Exception:
# _logger.exception("Can't get iface/IP")
pass
return ("<no-iface>", "<no-ip>")
def _get_uptime() -> str:
uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time()))
pl = {"days": uptime.days}
(pl["hours"], rem) = divmod(uptime.seconds, 3600)
(pl["mins"], pl["secs"]) = divmod(rem, 60)
return "{days}d {hours}h {mins}m".format(**pl)
def _get_temp(fahrenheit: bool) -> str:
try:
with open("/sys/class/thermal/thermal_zone0/temp") as temp_file:
temp = int((temp_file.read().strip())) / 1000
if fahrenheit:
temp = temp * 9 / 5 + 32
return f"{temp:.1f}\u00b0F"
return f"{temp:.1f}\u00b0C"
except Exception:
# _logger.exception("Can't read temp")
return "<no-temp>"
def _get_cpu() -> str:
st = psutil.cpu_times_percent()
user = st.user - st.guest
nice = st.nice - st.guest_nice
idle_all = st.idle + st.iowait
system_all = st.system + st.irq + st.softirq
virtual = st.guest + st.guest_nice
total = max(1, user + nice + system_all + idle_all + st.steal + virtual)
percent = int(
st.nice / total * 100
+ st.user / total * 100
+ system_all / total * 100
+ (st.steal + st.guest) / total * 100
)
return f"{percent}%"
def _get_mem() -> str:
return f"{int(psutil.virtual_memory().percent)}%"
# ===== # =====
class Screen: class Screen:
def __init__( def __init__(
@ -248,21 +181,20 @@ def main() -> None: # pylint: disable=too-many-locals,too-many-branches,too-man
swim += 1 swim += 1
time.sleep(0.5) time.sleep(0.5)
sensors = Sensors(options.fahrenheit)
if device.height >= 64: if device.height >= 64:
while stop_reason is None: while stop_reason is None:
(iface, ip) = _get_ip() text = "{fqdn}\n{ip}\niface: {iface}\ntemp: {temp}\ncpu: {cpu} mem: {mem}\n(__hb__) {uptime}"
text = f"{socket.getfqdn()}\n{ip}\niface: {iface}\ntemp: {_get_temp(options.fahrenheit)}" draw(sensors.render(text))
text += f"\ncpu: {_get_cpu()} mem: {_get_mem()}\n(__hb__) {_get_uptime()}"
draw(text)
else: else:
summary = True summary = True
while stop_reason is None: while stop_reason is None:
if summary: if summary:
text = f"{socket.getfqdn()}\n(__hb__) {_get_uptime()}\ntemp: {_get_temp(options.fahrenheit)}" text = "{fqdn}\n(__hb__) {uptime}\ntemp: {temp}"
else: else:
(iface, ip) = _get_ip() text = "{ip}\n(__hb__) iface: {iface}\ncpu: {cpu} mem: {mem}"
text = "%s\n(__hb__) iface: %s\ncpu: %s mem: %s" % (ip, iface, _get_cpu(), _get_mem()) draw(sensors.render(text))
draw(text)
summary = (not summary) summary = (not summary)
if stop_reason is not None: if stop_reason is not None:

128
kvmd/apps/oled/sensors.py Normal file
View File

@ -0,0 +1,128 @@
#!/usr/bin/env python3
# ========================================================================== #
# #
# KVMD-OLED - A small OLED daemon for PiKVM. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import socket
import functools
import datetime
import time
import netifaces
import psutil
# =====
class Sensors:
def __init__(self, fahrenheit: bool) -> None:
self.__fahrenheit = fahrenheit
self.__sensors = {
"fqdn": socket.getfqdn,
"iface": self.__get_iface,
"ip": self.__get_ip,
"uptime": self.__get_uptime,
"temp": self.__get_temp,
"cpu": self.__get_cpu,
"mem": self.__get_mem,
}
def render(self, text: str) -> str:
return text.format_map(self)
def __getitem__(self, key: str) -> str:
return self.__sensors[key]() # type: ignore
# =====
def __get_iface(self) -> str:
print("get_iface")
return self.__get_netconf(round(time.monotonic() / 0.3))[0]
def __get_ip(self) -> str:
print("get_ip")
return self.__get_netconf(round(time.monotonic() / 0.3))[1]
@functools.lru_cache(maxsize=1)
def __get_netconf(self, ts: int) -> tuple[str, str]:
_ = ts
try:
gws = netifaces.gateways()
if "default" in gws:
for proto in [socket.AF_INET, socket.AF_INET6]:
if proto in gws["default"]:
iface = gws["default"][proto][1]
addrs = netifaces.ifaddresses(iface)
return (iface, addrs[proto][0]["addr"])
for iface in netifaces.interfaces():
if not iface.startswith(("lo", "docker")):
addrs = netifaces.ifaddresses(iface)
for proto in [socket.AF_INET, socket.AF_INET6]:
if proto in addrs:
return (iface, addrs[proto][0]["addr"])
except Exception:
# _logger.exception("Can't get iface/IP")
pass
return ("<no-iface>", "<no-ip>")
# =====
def __get_uptime(self) -> str:
uptime = datetime.timedelta(seconds=int(time.time() - psutil.boot_time()))
pl = {"days": uptime.days}
(pl["hours"], rem) = divmod(uptime.seconds, 3600)
(pl["mins"], pl["secs"]) = divmod(rem, 60)
return "{days}d {hours}h {mins}m".format(**pl)
# =====
def __get_temp(self) -> str:
try:
with open("/sys/class/thermal/thermal_zone0/temp") as file:
temp = int(file.read().strip()) / 1000
if self.__fahrenheit:
temp = temp * 9 / 5 + 32
return f"{temp:.1f}\u00b0F"
return f"{temp:.1f}\u00b0C"
except Exception:
# _logger.exception("Can't read temp")
return "<no-temp>"
# =====
def __get_cpu(self) -> str:
st = psutil.cpu_times_percent()
user = st.user - st.guest
nice = st.nice - st.guest_nice
idle_all = st.idle + st.iowait
system_all = st.system + st.irq + st.softirq
virtual = st.guest + st.guest_nice
total = max(1, user + nice + system_all + idle_all + st.steal + virtual)
percent = int(
st.nice / total * 100
+ st.user / total * 100
+ system_all / total * 100
+ (st.steal + st.guest) / total * 100
)
return f"{percent}%"
def __get_mem(self) -> str:
return f"{int(psutil.virtual_memory().percent)}%"