Merge remote-tracking branch 'origin/meconopsis' into celestia-wayland

This commit is contained in:
Kevin Alberts 2025-07-25 13:56:53 +02:00
commit 5a0041e7d5
10 changed files with 745 additions and 1187 deletions

View file

@ -7,62 +7,19 @@ import cairocffi
import iwlib
import netifaces
import psutil
import six
import unicodedata
from libqtile import bar, pangocffi
from libqtile import bar, qtile
from libqtile.log_utils import logger
from libqtile.command.base import expose_command
from libqtile.widget import base
from libqtile.widget.base import ORIENTATION_HORIZONTAL
from libqtile.widget.battery import default_icon_path, load_battery, BatteryState
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.currentlayout import CurrentLayoutIcon
from libqtile.widget.graph import _Graph
from libqtile.widget.tasklist import TaskList
from libqtile.widget.wlan import get_status
from libqtile.backend.x11.window import Window
from libqtile.widget.groupbox import GroupBox
from libqtile.command.base import expose_command
from kuro.utils.general import notify, BUTTON_LEFT, BUTTON_MIDDLE, BUTTON_RIGHT, BUTTON_DOWN, BUTTON_UP, BUTTON_MUTE, \
call_process
class CheckUpdatesYay(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYay, self).__init__(**config)
# Override command and output with yay command
self.cmd = "yay -Qu".split()
self.status_cmd = "yay -Qu --color never".split()
self.update_cmd = "sudo yay".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYay, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class DualPaneTextboxBase(base._Widget):
"""
Base class for widgets that are two boxes next to each other both containing text.
@ -267,364 +224,6 @@ class DualPaneTextboxBase(base._Widget):
return d
class MediaWidget(base.InLoopPollText):
"""Media Status Widget"""
class Status:
OFFLINE = 0
PLAYING = 1
PAUSED = 2
STOPPED = 3
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('off_text', '', 'The pattern for the text if no players are found.'),
('on_text_play', '{}', 'The pattern for the text if music is playing.'),
('on_text_pause', '{}', 'The pattern for the text if music is paused.'),
('on_text_stop', '{}', 'The pattern for the text if music is stopped.'),
('update_interval', 1, 'The update interval.'),
('max_chars_per_player', 50, 'Maximum characters of text per player.'),
('ignore_players', '', 'Comma-separated list of players to ignore.')
]
player_icons = {
'spotify': '',
'vlc': '',
'firefox': '',
'mpv': '',
}
custom_player_data = {
'firefox': {
'showing': False,
'title': '',
'state': Status.STOPPED,
}
}
image_urls = {}
current_image_url = None
player_to_control = None
def __init__(self, **config):
super(MediaWidget, self).__init__(**config)
self.add_defaults(MediaWidget.defaults)
self.surfaces = {}
self.player_to_control = None
def _player_to_control(self):
info = self._get_info()
players = {}
for player in info.keys():
if player not in self.custom_player_data.keys():
if info[player][0] in [MediaWidget.Status.PLAYING, MediaWidget.Status.PAUSED]:
players[player] = info[player]
if self.player_to_control is not None and self.player_to_control not in players.keys():
self.player_to_control = None
if self.player_to_control is not None:
players = {self.player_to_control: players[self.player_to_control]}
if len(players.keys()) == 1:
player = list(players.keys())[0]
self.player_to_control = player
return player
elif len(players) == 0:
notify("MediaWidget", "Nothing to control!")
else:
notify("MediaWidget", "Multiple players to control, I don't know what you want to do!")
return None
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-i", self.ignore_players, "-p", player, "play-pause"]
_ = self.call_process(command)
notify("MediaWidget", "Toggled {}".format(player))
if button == BUTTON_RIGHT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-i", self.ignore_players, "-p", player, "next"]
_ = self.call_process(command)
if button == BUTTON_MIDDLE:
# Jump to the screen that the player is on
# clients = list(self.bar.qtile.windows_map.values())
# logger.warning("{}")
pass
@expose_command()
def update_custom_player(self, player_name, data):
# Update firefox player
if player_name.startswith("firefox"):
if data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED
self.custom_player_data['firefox']['title'] = data['title']
elif data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = False
self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE
self.custom_player_data['firefox']['title'] = data['title']
def _get_players(self):
players = []
# Playerctl players
try:
result = self.call_process(["playerctl", "-i", self.ignore_players, "-l"])
except subprocess.CalledProcessError:
result = None
if result:
players.extend([x for x in result.split("\n") if x])
# Custom players - Firefox
if self.custom_player_data['firefox']['showing']:
players.append('firefox')
if players:
return players
else:
return None
def _get_info(self):
players = self._get_players()
if not players:
return {}
else:
result = {}
for player in players:
if player in self.custom_player_data.keys():
# Custom player -- Firefox
if player == "firefox":
result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']]
# Other custom players -- generic attempt with error catching
else:
try:
result[player] = [self.custom_player_data[player]['state'],
self.custom_player_data[player]['title']]
except KeyError:
pass
else:
# PlayerCtl player
command = ["playerctl", "-i", self.ignore_players, "-p", player, "status"]
cmd_result = self.call_process(command).strip()
text = "Unknown"
if cmd_result in ["Playing", "Paused"]:
try:
artist = self.call_process(['playerctl', "-i", self.ignore_players, '-p', player, 'metadata', 'artist']).strip()
except subprocess.CalledProcessError:
artist = None
try:
title = self.call_process(['playerctl', "-i", self.ignore_players, '-p', player, 'metadata', 'title']).strip()
except subprocess.CalledProcessError:
title = None
if artist and title:
text = "{} - {}".format(artist, title)
elif artist:
text = artist
elif title:
text = title
if cmd_result == "Playing":
result[player] = [MediaWidget.Status.PLAYING, text]
elif cmd_result == "Paused":
result[player] = [MediaWidget.Status.PAUSED, text]
elif cmd_result == "Stopped":
result[player] = [MediaWidget.Status.STOPPED, ""]
return result
def _get_formatted_text(self, status):
if status[0] == MediaWidget.Status.PLAYING:
res = self.on_text_play.format(status[1])
elif status[0] == MediaWidget.Status.PAUSED:
res = self.on_text_pause.format(status[1])
elif status[0] == MediaWidget.Status.STOPPED:
res = self.on_text_stop.format(status[1])
else:
res = "Unknown"
res = pangocffi.markup_escape_text(res)
res = unicodedata.normalize('NFKD', res)
if len(res) > self.max_chars_per_player:
res = res[:self.max_chars_per_player] + "..."
return res
def draw(self):
super(MediaWidget, self).draw()
def poll(self):
text = []
status = self._get_info()
if not status:
return self.off_text
else:
for player in status.keys():
# Shorten firefox.instance[0-9]+ to just firefox for icon finding
if player.startswith("firefox"):
player_icon = "firefox"
else:
player_icon = player
icon = self.player_icons.get(player_icon, player_icon)
text.append("{} {}".format(icon, self._get_formatted_text(status[player])))
return " | ".join(text) if text else self.off_text
class AudioVisualizerWidget(_Graph):
"""Display Audio Visualization graph"""
orientations = base.ORIENTATION_HORIZONTAL
fixed_upper_bound = True
defaults = [
("graph_color", "FFFFFF.0", "Graph color"),
("fill_color", "FFFFFF.0", "Fill color for linefill graph"),
("border_color", "FFFFFF.0", "Widget border color"),
("border_width", 0, "Widget border width"),
("line_width", 0, "Line width"),
]
def __init__(self, **config):
_Graph.__init__(self, **config)
self.add_defaults(AudioVisualizerWidget.defaults)
self.client = None
self.screen = None
self.old_position = None
def set_client(self, c, s):
self.client = c
self.screen = s
def update_graph(self):
if self.client is not None:
viz_info = self.info()
pos_x = viz_info['offset'] + self.margin_x + self.screen.x
pos_y = 0 + self.margin_y + self.screen.y
if self.old_position != (pos_x, pos_y):
self.old_position = (pos_x, pos_y)
# Check if a window on this screen is full-screen
fullscreen = False
for window in self.screen.group.windows:
if isinstance(window, Window):
if window.fullscreen:
fullscreen = True
break
logger.debug("Repositioning {} {} to {}x{}".format(self.client, self.client.window.wid, pos_x, pos_y))
self.client.reposition(pos_x, pos_y, above=not fullscreen)
self.draw()
def draw(self):
self.drawer.clear(self.background or self.bar.background)
self.drawer.draw(offsetx=self.offset, width=self.width)
class KuroCurrentLayoutIcon(CurrentLayoutIcon):
def _get_layout_names(self):
names = list(super(KuroCurrentLayoutIcon, self)._get_layout_names())
from kuro.utils import layouts as kuro_layouts
from libqtile.layout.base import Layout
klayouts = [
layout_class_name.lower()
for layout_class, layout_class_name
in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts))
if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout)
]
names.extend(klayouts)
return set(names)
class KuroTaskList(TaskList):
defaults = [
(
'txt_pinned',
'P ',
'Text representation of the pinned window state. '
'e.g., "P " or "\U0001F5D7 "'
),
(
'markup_pinned',
None,
'Text markup of the pinned window state. Supports pangomarkup with markup=True.'
'e.g., "{}" or "<span underline="low">{}</span>"'
),
]
def __init__(self, *args, **kwargs):
super(KuroTaskList, self).__init__(*args, **kwargs)
self.add_defaults(KuroTaskList.defaults)
def get_taskname(self, window):
"""
Get display name for given window.
Depending on its state minimized, maximized and floating
appropriate characters are prepended.
"""
state = ''
markup_str = self.markup_normal
# Enforce markup and new string format behaviour when
# at least one markup_* option is used.
# Mixing non markup and markup may cause problems.
if self.markup_minimized or self.markup_maximized\
or self.markup_floating or self.markup_focused or self.markup_pinned:
enforce_markup = True
else:
enforce_markup = False
if window is None:
pass
elif hasattr(window, "is_static_window") and window.is_static_window:
state = self.txt_pinned
markup_str = self.markup_pinned
elif window.minimized:
state = self.txt_minimized
markup_str = self.markup_minimized
elif window.maximized:
state = self.txt_maximized
markup_str = self.markup_maximized
elif window.floating:
state = self.txt_floating
markup_str = self.markup_floating
elif window is window.group.current_window:
markup_str = self.markup_focused
window_name = window.name if window and window.name else "?"
# Emulate default widget behavior if markup_str is None
if enforce_markup and markup_str is None:
markup_str = "%s{}" % (state)
if markup_str is not None:
self.markup = True
window_name = pangocffi.markup_escape_text(window_name)
return markup_str.format(window_name)
return "%s%s" % (state, window_name)
class GPUStatusWidget(base._TextBox):
"""Displays the currently used GPU."""
@ -655,7 +254,11 @@ class GPUStatusWidget(base._TextBox):
self.icons.update(self.custom_icons)
def _get_info(self):
output = self.call_process(self.check_command, shell=True)
try:
output = self.call_process(self.check_command, shell=True)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.check_command} - {e}")
output = None
mode = "nvidia" if "nvidia" in output else "intel" if "intel" in output else "unknown"
return {'error': False, 'mode': mode}
@ -731,14 +334,16 @@ class GPUStatusWidget(base._TextBox):
if button == BUTTON_LEFT:
try:
next_gpu = self.call_process(self.next_command, shell=True).split(":")[1].strip()
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.next_command} - {e}")
except IndexError:
next_gpu = "Unknown"
if self.current_status == "Unknown":
notify("GPU Status", "The currently used GPU is <b>unknown</b>.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(next_gpu),
notify(None, "GPU Status", "The currently used GPU is <b>unknown</b>.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(next_gpu),
image=os.path.join(self.theme_path, "gpu-unknown.png"))
else:
notify("GPU Status", "The system is currently running on the <b>{}</b> GPU. Press the middle mouse "
notify(None, "GPU Status", "The system is currently running on the <b>{}</b> GPU. Press the middle mouse "
"button on this icon to switch GPUs.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(
self.current_status, next_gpu
),
@ -747,38 +352,25 @@ class GPUStatusWidget(base._TextBox):
if button == BUTTON_MIDDLE:
command = ["optimus-manager", "--no-confirm", "--switch", "auto"]
output = self.call_process(command)
try:
output = self.call_process(command)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {command} - {e}")
output = ""
if "nvidia" in output:
notify("GPU Switched", "The GPU has been switched from Intel to NVidia.\n"
notify(None, "GPU Switched", "The GPU has been switched from Intel to NVidia.\n"
"Please log out and log back in to apply the changes to the session.",
image=os.path.join(self.theme_path, "gpu-nvidia.png"))
elif "intel" in output:
notify("GPU Switched", "The GPU has been switched from NVidia to Intel.\n"
notify(None, "GPU Switched", "The GPU has been switched from NVidia to Intel.\n"
"Please log out and log back in to apply the changes to the session.",
image=os.path.join(self.theme_path, "gpu-intel.png"))
else:
notify("GPU Switch Error", "I could not determine if the GPU was switched successfully.\n"
notify(None, "GPU Switch Error", "I could not determine if the GPU was switched successfully.\n"
"Please log out and log back in to clear up the inconsistency.",
image=os.path.join(self.theme_path, "gpu-unknown.png"))
class TextSpacerWidget(base._TextBox):
"""Displays a text separator"""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('spacer', None, 'The character/text to use as separator. Default "|" if None.'),
('color', "#ffffff", "Color of the text."),
]
def __init__(self, **config):
super(TextSpacerWidget, self).__init__("Separator", bar.CALCULATED, **config)
self.add_defaults(TextSpacerWidget.defaults)
self.text = self.spacer or "|"
def draw(self):
base._TextBox.draw(self)
class ThermalSensorWidget(DualPaneTextboxBase):
defaults = [
('show_tag', False, 'Show tag sensor'),
@ -816,7 +408,11 @@ class ThermalSensorWidget(DualPaneTextboxBase):
self.timeout_add(self.update_interval, self.timer_setup)
def _update_values(self):
sensors_out = self.call_process(self.get_command())
try:
sensors_out = self.call_process(self.get_command())
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.get_command()} - {e}")
return
temperature_values = {}
for name, temp, symbol in self.sensors_temp.findall(sensors_out):
name = name.strip()
@ -846,7 +442,7 @@ class ThermalSensorWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
notify("Temperature Information", "\n".join(
notify(None, "Temperature Information", "\n".join(
"{}: {}{}".format(name, *values) for name, values in self.values.items()
))
@ -899,7 +495,7 @@ class CPUInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
total = sum([self.cpu_old[0], self.cpu_old[1], self.cpu_old[2], self.cpu_old[3]])
notify("CPU Information", "user: {} %\nnice: {} %\nsys: {} %\nidle: {} %\ntotal: {} %".format(
notify(None, "CPU Information", "user: {} %\nnice: {} %\nsys: {} %\nidle: {} %\ntotal: {} %".format(
math.ceil((self.cpu_old[0] / total) * 100),
math.ceil((self.cpu_old[1] / total) * 100),
math.ceil((self.cpu_old[2] / total) * 100),
@ -952,7 +548,7 @@ class MemoryInfoWidget(DualPaneTextboxBase):
val['SwapUsed'] = swap.used // 1024 // 1024
if button == BUTTON_LEFT:
notify("Memory Information", "Memory: {}MB / {}MB\n {}%\nSwap: {}MB / {}MB\n {}%".format(
notify(None, "Memory Information", "Memory: {}MB / {}MB\n {}%\nSwap: {}MB / {}MB\n {}%".format(
val['MemUsed'], val['MemTotal'],
math.ceil((mem.used / mem.total) * 100),
val['SwapUsed'], val['SwapTotal'],
@ -1008,7 +604,7 @@ class DiskIOInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
notify("Disk IO Information",
notify(None, "Disk IO Information",
"Time that there were IO requests queued for /dev/{}: {} ms".format(self.hdd_device, self.io))
@ -1058,7 +654,7 @@ class NetworkInfoWidget(DualPaneTextboxBase):
status = iwlib.get_iwconfig(self.wireless_interface)
self.wireless_ips = netifaces.ifaddresses(self.wireless_interface)
disconnected = essid is None
percent = math.ceil((quality / 70) * 100)
percent = math.ceil(((quality or 0) / 70) * 100)
self.wireless_quality = quality
self.wireless_signal = percent
self.wireless_name = essid
@ -1077,7 +673,12 @@ class NetworkInfoWidget(DualPaneTextboxBase):
self.wired_ipv4 = self.wired_ips.get(netifaces.AF_INET, [{'addr': ""}])[0]['addr']
self.wired_ipv6 = self.wired_ips.get(netifaces.AF_INET6, [{'addr': ""}])[0]['addr']
self.wired_mac = self.wired_ips.get(netifaces.AF_LINK, [{'addr': ""}])[0]['addr']
eth_status = call_process(["ip", "link", "show", "{}".format(self.wired_interface)])
command = ["ip", "link", "show", "{}".format(self.wired_interface)]
try:
eth_status = call_process(command)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {command} - {e}")
return
m = self.wired_up_regex.search(eth_status)
if m:
self.wired_connected = "UP" in m.group(1)
@ -1135,9 +736,9 @@ class NetworkInfoWidget(DualPaneTextboxBase):
wired_text = "<b>Wired: Not connected</b>"
if wifi_text:
notify(title, "{}\n\n{}".format(wifi_text, wired_text))
notify(None, title, "{}\n\n{}".format(wifi_text, wired_text))
else:
notify(title, "\n{}".format(wired_text))
notify(None, title, "\n{}".format(wired_text))
class BatteryInfoWidget(DualPaneTextboxBase):
@ -1213,7 +814,7 @@ class BatteryInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify("Battery Status", output)
notify(None, "Battery Status", output)
class VolumeInfoWidget(DualPaneTextboxBase):
@ -1248,7 +849,8 @@ class VolumeInfoWidget(DualPaneTextboxBase):
else:
cmd = self.status_cmd
mixer_out = self.call_process(cmd.split(" "))
except subprocess.CalledProcessError:
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {cmd} - {e}")
return -1
try:
return int(mixer_out)
@ -1292,7 +894,7 @@ class VolumeInfoWidget(DualPaneTextboxBase):
output = subprocess.check_output(cmd.split(" ")).decode('utf-8')
sink = "Sink {}\n".format(self.pulse_sink) if self.pulse_sink else ""
notify("Volume Status", sink+output)
notify(None, "Volume Status", sink+output)
elif button == BUTTON_RIGHT:
if "{sink}" in self.volume_app:
@ -1343,3 +945,16 @@ class VolumeInfoWidget(DualPaneTextboxBase):
def run_app(self):
# Emulate button press.
self.button_press(0, 0, BUTTON_RIGHT)
class KuroGroupBox(GroupBox):
@property
def length(self):
try:
return super(KuroGroupBox, self).length
except AttributeError:
return 1
@length.setter
def length(self, length):
logger.warning(f"Setting groupbox length to {length}")