Meconopsis changes for Wayland and general refactoring/optimizations

This commit is contained in:
Kevin Alberts 2024-02-08 18:59:30 +01:00
parent 1e65025045
commit 2860100089
9 changed files with 697 additions and 1185 deletions

View file

@ -53,12 +53,12 @@ except ImportError as e:
try:
logger.info("Initializing theme...")
logger.warning("Initializing theme...")
# Initialize the Theme
Theme.initialize()
logger.info("Initialize done")
logger.warning("Initialize done")
logger.info("Hooking theme into callbacks...")
logger.warning("Hooking theme into callbacks...")
# Hook theme into all hooks we know of
hook.subscribe.startup_once(Theme.callback_startup_once)
hook.subscribe.startup(Theme.callback_startup)
@ -82,10 +82,11 @@ try:
hook.subscribe.selection_notify(Theme.callback_selection_notify)
hook.subscribe.selection_change(Theme.callback_selection_change)
hook.subscribe.screen_change(Theme.callback_screen_change)
hook.subscribe.screens_reconfigured(Theme.callback_screens_reconfigured)
hook.subscribe.current_screen_change(Theme.callback_current_screen_change)
logger.info("Hooking done")
logger.warning("Hooking done")
logger.info("Initializing theme variables")
logger.warning("Initializing theme variables")
# Initialize variables from theme
keys = Theme.keys
mouse = Theme.mouse
@ -104,7 +105,8 @@ try:
focus_on_window_activation = Theme.focus_on_window_activation
extensions = Theme.extensions
wmname = Theme.wmname
logger.info("Variable initialization done")
reconfigure_screens = Theme.reconfigure_screens
logger.warning("Variable initialization done")
except Exception as e:
Theme = None
Config = None

View file

@ -2,6 +2,9 @@ from libqtile import layout as libqtile_layout, layout, bar, widget
from libqtile.command import lazy
from libqtile.config import Key, Group, Screen, Drag, Click, Match
# Initialize logging
from libqtile.log_utils import logger
class BaseConfig:
@classmethod
@ -15,10 +18,11 @@ class BaseConfig:
class BaseTheme:
# Changing variables initialized by function
keys = None
mouse = None
groups = None
layouts = None
widget_defaults = None
screens = None
screens = []
qtile = None
# 'Static' variables
@ -41,6 +45,7 @@ class BaseTheme:
auto_fullscreen = True
focus_on_window_activation = "smart"
extensions = []
reconfigure_screens = True
# XXX: Gasp! We're lying here. In fact, nobody really uses or cares about this
# string besides java UI toolkits; you can see several discussions on the
@ -56,11 +61,17 @@ class BaseTheme:
wmname = "LG3D"
def initialize(self):
logger.info("Initializing widget defaults...")
self.widget_defaults = self.init_widget_defaults()
logger.info("Initializing keys...")
self.keys = self.init_keys()
logger.info("Initializing groups...")
self.groups = self.init_groups()
logger.info("Initializing layouts...")
self.layouts = self.init_layouts()
logger.info("Initializing screens...")
self.screens = self.init_screens()
logger.info("Initializing mouse...")
self.mouse = self.init_mouse()
def init_keys(self):
@ -95,6 +106,15 @@ class BaseTheme:
Key(["mod4"], "r", lazy.spawncmd()),
]
def init_mouse(self):
return [
Drag(["mod4"], "Button1", lazy.window.set_position_floating(),
start=lazy.window.get_position()),
Drag(["mod4"], "Button3", lazy.window.set_size_floating(),
start=lazy.window.get_size()),
Click(["mod4"], "Button2", lazy.window.bring_to_front())
]
def init_groups(self):
groups = [Group(i) for i in "asdfuiop"]
for i in groups:
@ -138,15 +158,6 @@ class BaseTheme:
),
]
def init_mouse(self):
return [
Drag(["mod4"], "Button1", lazy.window.set_position_floating(),
start=lazy.window.get_position()),
Drag(["mod4"], "Button3", lazy.window.set_size_floating(),
start=lazy.window.get_size()),
Click(["mod4"], "Button2", lazy.window.bring_to_front())
]
# Callbacks
def callback_startup_once(self, *args, **kwargs):
pass
@ -220,5 +231,8 @@ class BaseTheme:
def callback_screen_change(self, *args, **kwargs):
pass
def callback_screens_reconfigured(self, *args, **kwargs):
pass
def callback_current_screen_change(self, *args, **kwargs):
pass

View file

@ -14,22 +14,37 @@ class Config(BaseConfig):
inactive_light = "#777777"
inactive_dark = "#333333"
# Default Applications
app_terminal = "terminator"
app_launcher = "/home/kevin/bin/dmenu_wal.sh"
web_browser = "firefox"
file_manager = "thunar"
app_chat = "/usr/bin/rambox"
app_irc = "quasselclient"
app_mail = "thunderbird"
app_music = "spotify"
# Predefined commands
cmd_brightness_up = "sudo /usr/bin/xbacklight -inc 10"
cmd_brightness_down = "sudo /usr/bin/xbacklight -dec 10"
lock_command = "bash /home/kevin/bin/lock.sh"
visualizer_app = "glava"
cmd_screenshot = "xfce4-screenshooter -r -c -d 1"
cmd_alt_screenshot = "xfce4-screenshooter -w -c -d 0"
app_terminal = "terminator"
web_browser = "firefox"
file_manager = "thunar"
app_launcher = "ulauncher-toggle --no-window-shadow"
lock_command = "bash /home/kevin/bin/lock.sh"
# Autostart applications
apps_autostart_group = [
{'group': "", 'command': ["firefox"]},
{'group': "", 'command': ["terminator"]},
{'group': "", 'command': ["/usr/bin/rambox"]},
{'group': "", 'command': ["thunar"]},
{'group': "", 'command': ["thunderbird"]},
{'group': "", 'command': ["spotify"]},
]
apps_autostart = [
["ulauncher", "--hide-window", "--no-window-shadow"], # App launcher background daemon
["mako"], # Notification daemon
["kanshi"], # Display hotplug
["wl-paste", "--watch", "cliphist", "store"], # Clipboard manager
["/usr/lib/kdeconnectd"], # KDE Connect daemon
["kdeconnect-indicator"], # KDE Connect tray
["vorta"], # Vorta backup scheduler
]
# Keyboard commands
cmd_media_play = "playerctl -i kdeconnect play-pause"
cmd_media_next = "playerctl -i kdeconnect next"
@ -43,9 +58,10 @@ class Config(BaseConfig):
cmd_monitor_mode_day = "bash /home/kevin/bin/monitor_day.sh"
cmd_monitor_mode_night = "bash /home/kevin/bin/monitor_night.sh"
cmd_monitor_mode_alt = "bash /home/kevin/bin/monitor_gamenight.sh"
cmd_reconfigure_screens = "kanshictl reload"
# Commands
wallpaper_config_command = "/home/kevin/bin/wal-nitrogen-noupdate"
wallpaper_config_command = "/bin/true"
# Images
desktop_bg = "/home/kevin/Pictures/wallpapers/desktop.png"
@ -83,7 +99,8 @@ class Config(BaseConfig):
# Bar variables
bar_background = background
bar_opacity = 0.8
bar_rgba_opacity = "AA"
bar_opacity = 1.0
bar_hover_opacity = 1
# Groupbox variables
@ -104,11 +121,12 @@ class Config(BaseConfig):
tasklist_urgent_border = highlight
tasklist_font = "Noto Sans"
tasklist_fontsize = 11
tasklist_rounded = False
# Thermal indicator variables
thermal_threshold = 75
thermal_sensor = "Tdie"
thermal_chip = "zenpower-pci-00c3"
thermal_sensor = "Package id 0"
thermal_chip = "coretemp-isa-0000"
# CPU graph variables
cpu_graph_colour = '#ff0000'
@ -123,10 +141,11 @@ class Config(BaseConfig):
battery_theme_path = "/home/kevin/.config/qtile/kuro/resources/battery"
battery_update_delay = 5
# Wifi variables
wifi_interface = "wifi0"
# Network variables
wifi_interface = "wlp3s0"
wifi_theme_path = "/home/kevin/.config/qtile/kuro/resources/wifi"
wifi_update_interval = 5
wired_interface = "enp4s0"
# GPU variables
gpu_theme_path = "/home/kevin/.config/qtile/kuro/resources/gpu"
@ -135,8 +154,8 @@ class Config(BaseConfig):
volume_font = "Noto Sans"
volume_fontsize = 11
volume_theme_path = "/home/kevin/.config/qtile/kuro/resources/volume"
volume_pulse_sink = "alsa_output.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00.analog-stereo-output"
volume_pulse_sink2 = "alsa_output.pci-0000_0d_00.4.analog-stereo"
volume_pulse_sink = "alsa_output.usb-CSCTEK_USB_Audio_and_HID_A34004801402-00.analog-stereo"
volume_pulse_sink2 = None
volume_is_bluetooth_icon = False
volume_update_interval = 0.2
@ -161,9 +180,6 @@ class Config(BaseConfig):
laptop_screen_nvidia = "eDP-1-1"
laptop_screen_intel = "eDP1"
# Keyboard colors
do_keyboard_updates = False
# Show audio visualizer
show_audio_visualizer = False
kill_unnecessary_glava_processes = True
@ -177,9 +193,5 @@ class Config(BaseConfig):
# Show battery widget
show_battery_widget = False
# Audio control applications
# apps_audio = ["pavucontrol"]
apps_audio_afterstart = []
# Comma-separated list of ignored players in the media widget
media_ignore_players = "kdeconnect"

File diff suppressed because it is too large Load diff

View file

@ -2,20 +2,14 @@ import re
import subprocess
import traceback
from time import sleep
from typing import List
import notify2
import six
from dbus import DBusException
from libqtile import widget
from libqtile.backend.x11.window import Internal
from libqtile.bar import Bar
from notify2 import Notification, URGENCY_NORMAL
from libqtile.log_utils import logger
try:
notify2.init("QTileWM")
except DBusException as e:
logger.error("Could not initialize notify2: {}".format(e))
from libqtile import qtile
BUTTON_LEFT = 1
BUTTON_MIDDLE = 2
@ -38,40 +32,68 @@ def is_running(process):
def execute(process):
try:
if isinstance(process, list):
return subprocess.Popen(process)
elif isinstance(process, str):
return subprocess.Popen(process.split())
else:
pass
logger.info(f"Failed to execute_once")
except FileNotFoundError as e:
logger.error(f"Could not execute {process}, FileNotFoundError - {e}")
def execute_once(process):
logger.info(f"Attempting to execute_once: {process}")
if not is_running(process):
if isinstance(process, list):
return subprocess.Popen(process)
elif isinstance(process, str):
return subprocess.Popen(process.split())
else:
pass
return execute(process)
logger.info(f"Process was already running: {process}")
def start_in_group(theme, qtile, group: str, command: List[str], floating: bool = False,
intrusive: bool = False, dont_break: bool = False):
try:
proc = subprocess.Popen(command)
match_args = {"net_wm_pid": proc.pid}
rule_args = {
"float": floating,
"intrusive": intrusive,
"group": group,
"break_on_match": not dont_break,
}
rule_id = qtile.add_rule(match_args, rule_args)
theme.autostart_app_rules[proc.pid] = rule_id
return proc
except FileNotFoundError as e:
logger.error(f"Could not execute {process}, FileNotFoundError - {e}")
def start_in_group_once(theme, qtile, group: str, command: List[str], floating: bool = False,
intrusive: bool = False, dont_break: bool = False):
logger.info(f"Attempting to start_in_group_once: {command}")
if not is_running(command):
return start_in_group(theme=theme, qtile=qtile, group=group, command=command,
floating=floating, intrusive=intrusive, dont_break=dont_break)
logger.info(f"Process was already running: {command}")
def call_process(command, **kwargs):
"""
This method uses `subprocess.check_output` to run the given command
and return the string from stdout, which is decoded when using
Python 3.
Run the given command and return the string from stdout.
"""
output = subprocess.check_output(command, **kwargs)
if six.PY3:
output = output.decode()
return output
return subprocess.check_output(command, **kwargs).decode()
def get_screen_count():
try:
if qtile.core.name == "x11":
logger.info("Using xrandr to detect screen count")
output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
output = [x for x in output.split("\n") if " connected" in x]
else:
logger.info("Using lsmon (wallutils) to detect screen count")
output = subprocess.check_output(["lsmon"]).decode('utf-8')
output = output.split("\n")
except subprocess.CalledProcessError:
return 1
@ -87,8 +109,21 @@ def bar_separator(config):
padding=config.get('padding_spacer', 4),
)
def init_notify(qtile):
if qtile and qtile.theme_instance and qtile.theme_instance.startup_completed:
try:
if not notify2.is_initted():
logger.warning("Initializing Notify2")
notify2.init("QTileWM")
except DBusException:
logger.error(f"Failed to initialize Notify2 (DBus error), retrying later.")
except Exception:
logger.error(f"Failed to initialize Notify2 (Generic error), retrying later.")
else:
logger.warning(f"Not initializing Notify2 yet, QTile startup not completed.")
def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
def notify(qtile, title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
if image is not None:
notification = Notification(
summary=title, message=content,
@ -101,13 +136,14 @@ def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
notification.set_timeout(timeout)
notification.set_urgency(urgency)
init_notify(qtile)
try:
try:
return notification.show()
except notify2.UninittedError:
logger.warning("Notify2 was uninitialized, initializing...")
notify2.init("qtile")
return notification.show()
except DBusException as e:
logger.warning("Notify2 is not initialized")
except Exception as e:
logger.warning("Showing notification failed: {}".format(e))
logger.warning(traceback.format_exc())
@ -125,6 +161,10 @@ def spawn_popup(qtile, x, y, text):
:return: The popup instance
:rtype: Internal
"""
if qtile.core.name == "x11":
from libqtile.backend.x11.window import Internal
else:
from libqtile.backend.wayland.window import Internal
popup = Internal.create(
qtile, x, y, 100, 100, opacity=1
)
@ -158,11 +198,11 @@ def display_wm_class(qtile):
window = qtile.currentWindow if qtile else None
if window:
wm_class = window.window.get_wm_class() or None
wm_class = window.get_wm_class() or None
name = window.name
if wm_class:
notify(title="WM_Class of {}".format(name),
notify(qtile=qtile, title="WM_Class of {}".format(name),
content="{}".format(wm_class),
urgency=notify2.URGENCY_CRITICAL)
@ -189,76 +229,3 @@ def bluetooth_audio_sink():
def bluetooth_audio_connected():
return bluetooth_audio_sink() != -1
class KuroTopBar(Bar):
def __init__(self, theme, widgets, size, **config):
self.theme = theme
super(KuroTopBar, self).__init__(widgets, size, **config)
def _configure(self, qtile, screen, *args, **kwargs):
super(KuroTopBar, self)._configure(qtile, screen)
self.window.handle_EnterNotify = self.handle_enter_notify
self.window.handle_LeaveNotify = self.handle_leave_notify
self.window.window.set_property("_NET_WM_NAME", "KuroTopBar")
self.window.update_name()
def draw(self):
if not self.widgets:
return
if not self._draw_queued:
self.future = self.qtile.call_soon(self._actual_draw)
self._draw_queued = True
def _actual_draw(self):
self._draw_queued = False
self._resize(self._length, self.widgets)
for i in self.widgets:
i.draw()
if self.widgets:
end = i.offset + i.length
if end < self._length:
if self.horizontal:
self.drawer.draw(offsetx=end, width=self._length - end)
else:
self.drawer.draw(offsety=end, height=self._length - end)
self.theme.update_visualizers()
def handle_enter_notify(self, e):
# self.theme.log_debug("Bar HandleEnterNotify")
#
# self.window.opacity = Config.get('bar_hover_opacity', 1.0)
# print("Bar Hover Enter")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_enter"):
# hovered_widget.handle_hover_enter(e)
self.draw()
def handle_leave_notify(self, e):
# self.theme.log_debug("Bar HandleLeaveNotify")
#
# self.window.opacity = Config.get('bar_opacity', 1.0)
# print("Bar Hover Leave")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_leave"):
# hovered_widget.handle_hover_leave(e)
self.draw()

View file

@ -55,7 +55,7 @@ def handle_focus_change(theme):
window = qtile.currentWindow if qtile else None
if window:
wm_class = window.window.get_wm_class() or None
wm_class = window.get_wm_class() or None
name = window.name
if wm_class:

View file

@ -32,29 +32,25 @@ class KuroFloating(Floating):
# 'sun-awt-X11-XWindowPeer' is a dropdown used in Java application,
# don't reposition it anywhere, let Java app to control it
cls = client.window.get_wm_class() or ''
is_java_dropdown = 'sun-awt-X11-XWindowPeer' in cls
cls = client.get_wm_class() or ""
is_java_dropdown = "sun-awt-X11-XWindowPeer" in cls
if is_java_dropdown:
client.paint_borders(bc, bw)
client.cmd_bring_to_front()
client.bring_to_front()
# alternatively, users may have asked us explicitly to leave the client alone
elif any(m.compare(client) for m in self.no_reposition_rules):
client.paint_borders(bc, bw)
client.cmd_bring_to_front()
client.bring_to_front()
else:
above = False
# We definitely have a screen here, so let's be sure we'll float on screen
try:
client.float_x
client.float_y
except AttributeError:
if client.float_x is None or client.float_y is None:
# this window hasn't been placed before, let's put it in a sensible spot
above = self.compute_client_position(client, screen_rect)
client.place(
client.x,
client.y,
@ -63,5 +59,6 @@ class KuroFloating(Floating):
bw,
bc,
above,
respect_hints=True,
)
client.unhide()

View file

@ -7,61 +7,18 @@ import cairocffi
import iwlib
import netifaces
import psutil
import six
import unicodedata
from libqtile import bar, pangocffi
from libqtile import bar, qtile
from libqtile.log_utils import logger
from libqtile.widget import base
from libqtile.widget.base import ORIENTATION_HORIZONTAL
from libqtile.widget.battery import default_icon_path, load_battery, BatteryState
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.currentlayout import CurrentLayoutIcon
from libqtile.widget.graph import _Graph
from libqtile.widget.tasklist import TaskList
from libqtile.widget.wlan import get_status
from libqtile.backend.x11.window import Window
from libqtile.command.base import expose_command
from kuro.utils.general import notify, BUTTON_LEFT, BUTTON_MIDDLE, BUTTON_RIGHT, BUTTON_DOWN, BUTTON_UP, BUTTON_MUTE, \
call_process
class CheckUpdatesYay(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYay, self).__init__(**config)
# Override command and output with yay command
self.cmd = "yay -Qu".split()
self.status_cmd = "yay -Qu --color never".split()
self.update_cmd = "sudo yay".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYay, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class DualPaneTextboxBase(base._Widget):
"""
Base class for widgets that are two boxes next to each other both containing text.
@ -242,7 +199,8 @@ class DualPaneTextboxBase(base._Widget):
self.bar.draw()
self.changed = False
def cmd_set_font(self, font=base.UNSPECIFIED, fontsize_left=base.UNSPECIFIED, fontsize_right=base.UNSPECIFIED, fontshadow=base.UNSPECIFIED):
@expose_command()
def set_font(self, font=base.UNSPECIFIED, fontsize_left=base.UNSPECIFIED, fontsize_right=base.UNSPECIFIED, fontshadow=base.UNSPECIFIED):
"""
Change the font used by this widget. If font is None, the current
font is used.
@ -265,363 +223,6 @@ class DualPaneTextboxBase(base._Widget):
return d
class MediaWidget(base.InLoopPollText):
"""Media Status Widget"""
class Status:
OFFLINE = 0
PLAYING = 1
PAUSED = 2
STOPPED = 3
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('off_text', '', 'The pattern for the text if no players are found.'),
('on_text_play', '{}', 'The pattern for the text if music is playing.'),
('on_text_pause', '{}', 'The pattern for the text if music is paused.'),
('on_text_stop', '{}', 'The pattern for the text if music is stopped.'),
('update_interval', 1, 'The update interval.'),
('max_chars_per_player', 50, 'Maximum characters of text per player.'),
('ignore_players', '', 'Comma-separated list of players to ignore.')
]
player_icons = {
'spotify': '',
'vlc': '',
'firefox': '',
'mpv': '',
}
custom_player_data = {
'firefox': {
'showing': False,
'title': '',
'state': Status.STOPPED,
}
}
image_urls = {}
current_image_url = None
player_to_control = None
def __init__(self, **config):
super(MediaWidget, self).__init__(**config)
self.add_defaults(MediaWidget.defaults)
self.surfaces = {}
self.player_to_control = None
def _player_to_control(self):
info = self._get_info()
players = {}
for player in info.keys():
if player not in self.custom_player_data.keys():
if info[player][0] in [MediaWidget.Status.PLAYING, MediaWidget.Status.PAUSED]:
players[player] = info[player]
if self.player_to_control is not None and self.player_to_control not in players.keys():
self.player_to_control = None
if self.player_to_control is not None:
players = {self.player_to_control: players[self.player_to_control]}
if len(players.keys()) == 1:
player = list(players.keys())[0]
self.player_to_control = player
return player
elif len(players) == 0:
notify("MediaWidget", "Nothing to control!")
else:
notify("MediaWidget", "Multiple players to control, I don't know what you want to do!")
return None
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-i", self.ignore_players, "-p", player, "play-pause"]
_ = self.call_process(command)
notify("MediaWidget", "Toggled {}".format(player))
if button == BUTTON_RIGHT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-i", self.ignore_players, "-p", player, "next"]
_ = self.call_process(command)
if button == BUTTON_MIDDLE:
# Jump to the screen that the player is on
# clients = list(self.bar.qtile.windows_map.values())
# logger.warning("{}")
pass
def cmd_update_custom_player(self, player_name, data):
# Update firefox player
if player_name.startswith("firefox"):
if data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED
self.custom_player_data['firefox']['title'] = data['title']
elif data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = False
self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE
self.custom_player_data['firefox']['title'] = data['title']
def _get_players(self):
players = []
# Playerctl players
try:
result = self.call_process(["playerctl", "-i", self.ignore_players, "-l"])
except subprocess.CalledProcessError:
result = None
if result:
players.extend([x for x in result.split("\n") if x])
# Custom players - Firefox
if self.custom_player_data['firefox']['showing']:
players.append('firefox')
if players:
return players
else:
return None
def _get_info(self):
players = self._get_players()
if not players:
return {}
else:
result = {}
for player in players:
if player in self.custom_player_data.keys():
# Custom player -- Firefox
if player == "firefox":
result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']]
# Other custom players -- generic attempt with error catching
else:
try:
result[player] = [self.custom_player_data[player]['state'],
self.custom_player_data[player]['title']]
except KeyError:
pass
else:
# PlayerCtl player
command = ["playerctl", "-i", self.ignore_players, "-p", player, "status"]
cmd_result = self.call_process(command).strip()
text = "Unknown"
if cmd_result in ["Playing", "Paused"]:
try:
artist = self.call_process(['playerctl', "-i", self.ignore_players, '-p', player, 'metadata', 'artist']).strip()
except subprocess.CalledProcessError:
artist = None
try:
title = self.call_process(['playerctl', "-i", self.ignore_players, '-p', player, 'metadata', 'title']).strip()
except subprocess.CalledProcessError:
title = None
if artist and title:
text = "{} - {}".format(artist, title)
elif artist:
text = artist
elif title:
text = title
if cmd_result == "Playing":
result[player] = [MediaWidget.Status.PLAYING, text]
elif cmd_result == "Paused":
result[player] = [MediaWidget.Status.PAUSED, text]
elif cmd_result == "Stopped":
result[player] = [MediaWidget.Status.STOPPED, ""]
return result
def _get_formatted_text(self, status):
if status[0] == MediaWidget.Status.PLAYING:
res = self.on_text_play.format(status[1])
elif status[0] == MediaWidget.Status.PAUSED:
res = self.on_text_pause.format(status[1])
elif status[0] == MediaWidget.Status.STOPPED:
res = self.on_text_stop.format(status[1])
else:
res = "Unknown"
res = pangocffi.markup_escape_text(res)
res = unicodedata.normalize('NFKD', res)
if len(res) > self.max_chars_per_player:
res = res[:self.max_chars_per_player] + "..."
return res
def draw(self):
super(MediaWidget, self).draw()
def poll(self):
text = []
status = self._get_info()
if not status:
return self.off_text
else:
for player in status.keys():
# Shorten firefox.instance[0-9]+ to just firefox for icon finding
if player.startswith("firefox"):
player_icon = "firefox"
else:
player_icon = player
icon = self.player_icons.get(player_icon, player_icon)
text.append("{} {}".format(icon, self._get_formatted_text(status[player])))
return " | ".join(text) if text else self.off_text
class AudioVisualizerWidget(_Graph):
"""Display Audio Visualization graph"""
orientations = base.ORIENTATION_HORIZONTAL
fixed_upper_bound = True
defaults = [
("graph_color", "FFFFFF.0", "Graph color"),
("fill_color", "FFFFFF.0", "Fill color for linefill graph"),
("border_color", "FFFFFF.0", "Widget border color"),
("border_width", 0, "Widget border width"),
("line_width", 0, "Line width"),
]
def __init__(self, **config):
_Graph.__init__(self, **config)
self.add_defaults(AudioVisualizerWidget.defaults)
self.client = None
self.screen = None
self.old_position = None
def set_client(self, c, s):
self.client = c
self.screen = s
def update_graph(self):
if self.client is not None:
viz_info = self.info()
pos_x = viz_info['offset'] + self.margin_x + self.screen.x
pos_y = 0 + self.margin_y + self.screen.y
if self.old_position != (pos_x, pos_y):
self.old_position = (pos_x, pos_y)
# Check if a window on this screen is full-screen
fullscreen = False
for window in self.screen.group.windows:
if isinstance(window, Window):
if window.fullscreen:
fullscreen = True
break
logger.debug("Repositioning {} {} to {}x{}".format(self.client, self.client.window.wid, pos_x, pos_y))
self.client.reposition(pos_x, pos_y, above=not fullscreen)
self.draw()
def draw(self):
self.drawer.clear(self.background or self.bar.background)
self.drawer.draw(offsetx=self.offset, width=self.width)
class KuroCurrentLayoutIcon(CurrentLayoutIcon):
def _get_layout_names(self):
names = super(KuroCurrentLayoutIcon, self)._get_layout_names()
from kuro.utils import layouts as kuro_layouts
from libqtile.layout.base import Layout
klayouts = [
layout_class_name.lower()
for layout_class, layout_class_name
in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts))
if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout)
]
names.extend(klayouts)
return list(set(names))
class KuroTaskList(TaskList):
defaults = [
(
'txt_pinned',
'P ',
'Text representation of the pinned window state. '
'e.g., "P " or "\U0001F5D7 "'
),
(
'markup_pinned',
None,
'Text markup of the pinned window state. Supports pangomarkup with markup=True.'
'e.g., "{}" or "<span underline="low">{}</span>"'
),
]
def __init__(self, *args, **kwargs):
super(KuroTaskList, self).__init__(*args, **kwargs)
self.add_defaults(KuroTaskList.defaults)
def get_taskname(self, window):
"""
Get display name for given window.
Depending on its state minimized, maximized and floating
appropriate characters are prepended.
"""
state = ''
markup_str = self.markup_normal
# Enforce markup and new string format behaviour when
# at least one markup_* option is used.
# Mixing non markup and markup may cause problems.
if self.markup_minimized or self.markup_maximized\
or self.markup_floating or self.markup_focused or self.markup_pinned:
enforce_markup = True
else:
enforce_markup = False
if window is None:
pass
elif hasattr(window, "is_static_window") and window.is_static_window:
state = self.txt_pinned
markup_str = self.markup_pinned
elif window.minimized:
state = self.txt_minimized
markup_str = self.markup_minimized
elif window.maximized:
state = self.txt_maximized
markup_str = self.markup_maximized
elif window.floating:
state = self.txt_floating
markup_str = self.markup_floating
elif window is window.group.current_window:
markup_str = self.markup_focused
window_name = window.name if window and window.name else "?"
# Emulate default widget behavior if markup_str is None
if enforce_markup and markup_str is None:
markup_str = "%s{}" % (state)
if markup_str is not None:
self.markup = True
window_name = pangocffi.markup_escape_text(window_name)
return markup_str.format(window_name)
return "%s%s" % (state, window_name)
class GPUStatusWidget(base._TextBox):
"""Displays the currently used GPU."""
@ -652,7 +253,11 @@ class GPUStatusWidget(base._TextBox):
self.icons.update(self.custom_icons)
def _get_info(self):
try:
output = self.call_process(self.check_command, shell=True)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.check_command} - {e}")
output = None
mode = "nvidia" if "nvidia" in output else "intel" if "intel" in output else "unknown"
return {'error': False, 'mode': mode}
@ -728,14 +333,16 @@ class GPUStatusWidget(base._TextBox):
if button == BUTTON_LEFT:
try:
next_gpu = self.call_process(self.next_command, shell=True).split(":")[1].strip()
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.next_command} - {e}")
except IndexError:
next_gpu = "Unknown"
if self.current_status == "Unknown":
notify("GPU Status", "The currently used GPU is <b>unknown</b>.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(next_gpu),
notify(None, "GPU Status", "The currently used GPU is <b>unknown</b>.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(next_gpu),
image=os.path.join(self.theme_path, "gpu-unknown.png"))
else:
notify("GPU Status", "The system is currently running on the <b>{}</b> GPU. Press the middle mouse "
notify(None, "GPU Status", "The system is currently running on the <b>{}</b> GPU. Press the middle mouse "
"button on this icon to switch GPUs.\n\nAfter the next login it will be the <b>{}</b> GPU.".format(
self.current_status, next_gpu
),
@ -744,38 +351,25 @@ class GPUStatusWidget(base._TextBox):
if button == BUTTON_MIDDLE:
command = ["optimus-manager", "--no-confirm", "--switch", "auto"]
try:
output = self.call_process(command)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {command} - {e}")
output = ""
if "nvidia" in output:
notify("GPU Switched", "The GPU has been switched from Intel to NVidia.\n"
notify(None, "GPU Switched", "The GPU has been switched from Intel to NVidia.\n"
"Please log out and log back in to apply the changes to the session.",
image=os.path.join(self.theme_path, "gpu-nvidia.png"))
elif "intel" in output:
notify("GPU Switched", "The GPU has been switched from NVidia to Intel.\n"
notify(None, "GPU Switched", "The GPU has been switched from NVidia to Intel.\n"
"Please log out and log back in to apply the changes to the session.",
image=os.path.join(self.theme_path, "gpu-intel.png"))
else:
notify("GPU Switch Error", "I could not determine if the GPU was switched successfully.\n"
notify(None, "GPU Switch Error", "I could not determine if the GPU was switched successfully.\n"
"Please log out and log back in to clear up the inconsistency.",
image=os.path.join(self.theme_path, "gpu-unknown.png"))
class TextSpacerWidget(base._TextBox):
"""Displays a text separator"""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('spacer', None, 'The character/text to use as separator. Default "|" if None.'),
('color', "#ffffff", "Color of the text."),
]
def __init__(self, **config):
super(TextSpacerWidget, self).__init__("Separator", bar.CALCULATED, **config)
self.add_defaults(TextSpacerWidget.defaults)
self.text = self.spacer or "|"
def draw(self):
base._TextBox.draw(self)
class ThermalSensorWidget(DualPaneTextboxBase):
defaults = [
('show_tag', False, 'Show tag sensor'),
@ -813,7 +407,11 @@ class ThermalSensorWidget(DualPaneTextboxBase):
self.timeout_add(self.update_interval, self.timer_setup)
def _update_values(self):
try:
sensors_out = self.call_process(self.get_command())
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {self.get_command()} - {e}")
return
temperature_values = {}
for name, temp, symbol in self.sensors_temp.findall(sensors_out):
name = name.strip()
@ -843,7 +441,7 @@ class ThermalSensorWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
notify("Temperature Information", "\n".join(
notify(None, "Temperature Information", "\n".join(
"{}: {}{}".format(name, *values) for name, values in self.values.items()
))
@ -896,7 +494,7 @@ class CPUInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
total = sum([self.cpu_old[0], self.cpu_old[1], self.cpu_old[2], self.cpu_old[3]])
notify("CPU Information", "user: {} %\nnice: {} %\nsys: {} %\nidle: {} %\ntotal: {} %".format(
notify(None, "CPU Information", "user: {} %\nnice: {} %\nsys: {} %\nidle: {} %\ntotal: {} %".format(
math.ceil((self.cpu_old[0] / total) * 100),
math.ceil((self.cpu_old[1] / total) * 100),
math.ceil((self.cpu_old[2] / total) * 100),
@ -949,7 +547,7 @@ class MemoryInfoWidget(DualPaneTextboxBase):
val['SwapUsed'] = swap.used // 1024 // 1024
if button == BUTTON_LEFT:
notify("Memory Information", "Memory: {}MB / {}MB\n {}%\nSwap: {}MB / {}MB\n {}%".format(
notify(None, "Memory Information", "Memory: {}MB / {}MB\n {}%\nSwap: {}MB / {}MB\n {}%".format(
val['MemUsed'], val['MemTotal'],
math.ceil((mem.used / mem.total) * 100),
val['SwapUsed'], val['SwapTotal'],
@ -1005,7 +603,7 @@ class DiskIOInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
notify("Disk IO Information",
notify(None, "Disk IO Information",
"Time that there were IO requests queued for /dev/{}: {} ms".format(self.hdd_device, self.io))
@ -1055,7 +653,7 @@ class NetworkInfoWidget(DualPaneTextboxBase):
status = iwlib.get_iwconfig(self.wireless_interface)
self.wireless_ips = netifaces.ifaddresses(self.wireless_interface)
disconnected = essid is None
percent = math.ceil((quality / 70) * 100)
percent = math.ceil(((quality or 0) / 70) * 100)
self.wireless_quality = quality
self.wireless_signal = percent
self.wireless_name = essid
@ -1074,7 +672,12 @@ class NetworkInfoWidget(DualPaneTextboxBase):
self.wired_ipv4 = self.wired_ips.get(netifaces.AF_INET, [{'addr': ""}])[0]['addr']
self.wired_ipv6 = self.wired_ips.get(netifaces.AF_INET6, [{'addr': ""}])[0]['addr']
self.wired_mac = self.wired_ips.get(netifaces.AF_LINK, [{'addr': ""}])[0]['addr']
eth_status = call_process(["ip", "link", "show", "{}".format(self.wired_interface)])
command = ["ip", "link", "show", "{}".format(self.wired_interface)]
try:
eth_status = call_process(command)
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {command} - {e}")
return
m = self.wired_up_regex.search(eth_status)
if m:
self.wired_connected = "UP" in m.group(1)
@ -1132,9 +735,9 @@ class NetworkInfoWidget(DualPaneTextboxBase):
wired_text = "<b>Wired: Not connected</b>"
if wifi_text:
notify(title, "{}\n\n{}".format(wifi_text, wired_text))
notify(None, title, "{}\n\n{}".format(wifi_text, wired_text))
else:
notify(title, "\n{}".format(wired_text))
notify(None, title, "\n{}".format(wired_text))
class BatteryInfoWidget(DualPaneTextboxBase):
@ -1210,7 +813,7 @@ class BatteryInfoWidget(DualPaneTextboxBase):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify("Battery Status", output)
notify(None, "Battery Status", output)
class VolumeInfoWidget(DualPaneTextboxBase):
@ -1245,7 +848,8 @@ class VolumeInfoWidget(DualPaneTextboxBase):
else:
cmd = self.status_cmd
mixer_out = self.call_process(cmd.split(" "))
except subprocess.CalledProcessError:
except subprocess.CalledProcessError as e:
logger.error(f"Error while calling {cmd} - {e}")
return -1
try:
return int(mixer_out)
@ -1289,7 +893,7 @@ class VolumeInfoWidget(DualPaneTextboxBase):
output = subprocess.check_output(cmd.split(" ")).decode('utf-8')
sink = "Sink {}\n".format(self.pulse_sink) if self.pulse_sink else ""
notify("Volume Status", sink+output)
notify(None, "Volume Status", sink+output)
elif button == BUTTON_RIGHT:
if "{sink}" in self.volume_app:
@ -1321,18 +925,22 @@ class VolumeInfoWidget(DualPaneTextboxBase):
self.update()
def cmd_increase_vol(self):
@expose_command()
def increase_vol(self):
# Emulate button press.
self.button_press(0, 0, BUTTON_UP)
def cmd_decrease_vol(self):
@expose_command()
def decrease_vol(self):
# Emulate button press.
self.button_press(0, 0, BUTTON_DOWN)
def cmd_mute(self):
@expose_command()
def mute(self):
# Emulate button press.
self.button_press(0, 0, BUTTON_MUTE)
def cmd_run_app(self):
@expose_command()
def run_app(self):
# Emulate button press.
self.button_press(0, 0, BUTTON_RIGHT)

View file

@ -1,6 +1,11 @@
from cairocffi.test_xcb import xcffib
from libqtile import hook
from libqtile.backend.x11.window import Window, Static
from libqtile import hook, qtile
if qtile.core.name == "x11":
from libqtile.backend.x11.window import Window, Static
else:
from libqtile.backend.wayland.window import Window, Static
class KuroStatic(Static):