From 8c070c86a5c7cfb72375f4ccabb23fff931128b1 Mon Sep 17 00:00:00 2001 From: Kevin Alberts Date: Wed, 13 Mar 2019 21:12:18 +0100 Subject: [PATCH] Multiple changes --- kuro/config.py | 3 + .../layout_icons/layout-kurofloating.png | Bin 0 -> 266 bytes kuro/theme.py | 270 +++++-- kuro/utils/general.py | 623 +--------------- kuro/utils/layouts.py | 117 +-- kuro/utils/widgets.py | 679 ++++++++++++++++++ kuro/utils/windows.py | 69 ++ 7 files changed, 1052 insertions(+), 709 deletions(-) create mode 100644 kuro/resources/layout_icons/layout-kurofloating.png create mode 100644 kuro/utils/widgets.py create mode 100644 kuro/utils/windows.py diff --git a/kuro/config.py b/kuro/config.py index 8eb1f53..8410049 100644 --- a/kuro/config.py +++ b/kuro/config.py @@ -23,12 +23,14 @@ class Config(BaseConfig): cmd_brightness_up = "sudo /usr/bin/xbacklight -inc 10" cmd_brightness_down = "sudo /usr/bin/xbacklight -dec 10" lock_command = "/home/kevin/bin/lock.sh" + visualizer_app = "glava" # Images desktop_bg = "/home/kevin/Pictures/wallpapers/desktop.png" desktop_bg_folder = "/home/kevin/Pictures/wallpapers/desktop_rotation" applauncher_image = "/home/kevin/.config/qtile/kuro/resources/arch.png" custom_layout_icon_paths = ['/home/kevin/.config/qtile/kuro/resources/layout_icons/'] + glava_color_file_path = "/home/kevin/.config/glava/kurobars_color.glsl" # Fonts font_default = "Noto Sans" @@ -138,4 +140,5 @@ class Config(BaseConfig): # Show audio visualizer show_audio_visualizer = True + kill_unnecessary_glava_processes = True diff --git a/kuro/resources/layout_icons/layout-kurofloating.png b/kuro/resources/layout_icons/layout-kurofloating.png new file mode 100644 index 0000000000000000000000000000000000000000..1dc2fb765c25e5b5505390ac8d067eba4d8f026e GIT binary patch literal 266 zcmeAS@N?(olHy`uVBq!ia0vp^4j|0I1|(Ny7TyC=EX7WqAsj$Z!;#VfSxgyVRm&Kn9ExwV=iC-tH@^WWb~d}Lsq3q70E6xV zO`wiDd5o>QM6dZffDANEcc^a{glV>Rgech}dbP0l+XkK^l)I^ literal 0 HcmV?d00001 diff --git a/kuro/theme.py b/kuro/theme.py index 8997c06..20afe3a 100644 --- a/kuro/theme.py +++ b/kuro/theme.py @@ -7,6 +7,9 @@ from libqtile.command import lazy from libqtile import layout, bar, widget # Import theme util functions +from xcffib.xproto import WindowError + +import kuro.utils.widgets from kuro.utils import general as utils # Import variables @@ -14,6 +17,7 @@ from kuro.base import BaseTheme from kuro.utils.general import display_wm_class, test_popups from kuro.utils.kb_backlight import handle_focus_change as kb_handle_focus_change from kuro.utils import layouts as kuro_layouts +from kuro.utils.windows import KuroStatic try: from kuro.config import Config @@ -43,6 +47,12 @@ class Kuro(BaseTheme): # Top bars topbars = [] + # Visualizers + audio_visualizers = [] + + # Static windows + static_windows = [] + # Current wallpaper path current_wallpaper = None @@ -52,6 +62,24 @@ class Kuro(BaseTheme): # Window manager name wmname = "QTile" + # Floating layout override + floating_layout = kuro_layouts.KuroFloating(float_rules=[ + {'wmclass': 'confirm'}, + {'wmclass': 'dialog'}, + {'wmclass': 'download'}, + {'wmclass': 'error'}, + {'wmclass': 'file_progress'}, + {'wmclass': 'notification'}, + {'wmclass': 'splash'}, + {'wmclass': 'toolbar'}, + {'wmclass': 'confirmreset'}, # gitk + {'wmclass': 'makebranch'}, # gitk + {'wmclass': 'maketag'}, # gitk + {'wname': 'branchdialog'}, # gitk + {'wname': 'pinentry'}, # GPG key password entry + {'wmclass': 'ssh-askpass'}, # ssh-askpass + ]) + def set_debug_text(self, text): for field in self.debug_textfields: field.text = text @@ -157,6 +185,9 @@ class Kuro(BaseTheme): # Reorganize screens Key([self.mod, "control"], "s", lazy.function(self.update_screens)), + # Toggle static windows + Key([self.mod], "p", lazy.function(self.toggle_window_static)), + ## # Debug keyboard shortcuts @@ -166,6 +197,12 @@ class Kuro(BaseTheme): # Redraw the top bar Key([self.mod, "shift", "control"], "r", lazy.function(self.redraw_bar)), + # Update visualizer widgets + Key([self.mod, "shift", "control"], "v", lazy.function(self.reinitialize_visualizers)), + + # Show extensive window info + Key([self.mod, "shift", "control"], "i", lazy.function(self.show_window_info)), + # Spawn a popup, and despawn it after 3 seconds Key([self.mod, "control"], "p", lazy.function(test_popups)), ] @@ -179,7 +216,7 @@ class Kuro(BaseTheme): groups.append(Group("", spawn=Config.get('web_browser', "xterm links"))) groups.append(Group("", spawn=Config.get('app_terminal', "xterm"))) groups.append(Group("")) - groups.append(Group("", spawn="franz4-bin")) + groups.append(Group("", spawn="franz")) groups.append(Group("", spawn="quasselclient")) groups.append(Group("", spawn=Config.get('file_manager', "thunar"))) groups.append(Group("", spawn="thunderbird")) @@ -194,6 +231,7 @@ class Kuro(BaseTheme): return [ kuro_layouts.KuroWmii( + theme=self, border_focus=Config.get('colour_border_focus', "#ffffff"), border_focus_stack=Config.get('colour_border_normal', "#777777"), border_normal=Config.get('colour_border_normal', "#777777"), @@ -222,7 +260,7 @@ class Kuro(BaseTheme): self.log_debug("Initializing screens") self.num_screens = utils.get_screen_count() - if self.num_screens == 0: + if self.num_screens <= 0: self.num_screens = 1 screens = [] @@ -245,7 +283,7 @@ class Kuro(BaseTheme): ), widget.Prompt(**self.widget_defaults), - widget.TaskList( + kuro.utils.widgets.KuroTaskList( border=Config.get('tasklist_border', '#ffffff'), borderwidth=Config.get('tasklist_borderwidth', 1), font=Config.get('tasklist_font', 'Arial'), @@ -260,21 +298,23 @@ class Kuro(BaseTheme): ]) if Config.get('show_audio_visualizer', False): - widgets.append(utils.AudioVisualizerWidget( + widgets.append(kuro.utils.widgets.AudioVisualizerWidget( graph_color=Config.get('visualizer_graph_color', "#ffffff"), fill_color=Config.get('visualizer_fill_color', "#ffffff.3"), border_color=Config.get('visualizer_border_color', "#000000"), border_width=Config.get('visualizer_graph_width', 0), line_width=Config.get('visualizer_line_width', 1), - frequency=0.05 + margin_x=1, + margin_y=1, + frequency=1 )) widgets.extend([ - utils.MediaWidget(), + kuro.utils.widgets.MediaWidget(), - utils.SeparatorWidget(), + kuro.utils.widgets.SeparatorWidget(), - utils.ThermalSensorWidget( + kuro.utils.widgets.ThermalSensorWidget( font=Config.get('font_topbar', 'Arial'), fontsize=Config.get('fontsize_topbar', 16), foreground=Config.get('thermal_colour', '#ffffff'), @@ -325,7 +365,7 @@ class Kuro(BaseTheme): frequency=2, ), - utils.KuroBatteryIcon( + kuro.utils.widgets.KuroBatteryIcon( battery_name=Config.get('battery_name', 'BAT0'), energy_full_file=Config.get('battery_energy_full_file', 'charge_full'), energy_now_file=Config.get('battery_energy_now_file', 'charge_now'), @@ -334,14 +374,14 @@ class Kuro(BaseTheme): update_delay=Config.get('battery_update_delay', 30) ), - utils.WifiIconWidget( + kuro.utils.widgets.WifiIconWidget( interface=Config.get('wifi_interface', 'wlp4s0'), theme_path=Config.get('wifi_theme_path', '/home/docs/checkouts/readthedocs.org/user_builds/qtile' '/checkouts/latest/libqtile/resources/battery-icons'), update_interval=Config.get('wifi_update_interval', 30) ), - utils.PulseVolumeWidget( + kuro.utils.widgets.PulseVolumeWidget( cardid=Config.get('volume_cardid', None), channel=Config.get('volume_channel', 'Master'), device=Config.get('volume_device', None), @@ -358,7 +398,7 @@ class Kuro(BaseTheme): update_interval=Config.get('volume_update_interval', 0.2) ), - utils.PulseVolumeWidget( + kuro.utils.widgets.PulseVolumeWidget( cardid=Config.get('bluevol_cardid', None), channel=Config.get('bluevol_channel', 'Master'), device=Config.get('bluevol_device', None), @@ -376,14 +416,14 @@ class Kuro(BaseTheme): ) ]) - # Systray only on first screen + # Systray can only be on one screen, so put it on the first if x == 0: widgets.append(widget.Systray(**self.widget_defaults)) widgets.extend([ - utils.KuroCurrentLayoutIcon(custom_icon_paths=Config.get('custom_layout_icon_paths', [])), + kuro.utils.widgets.KuroCurrentLayoutIcon(custom_icon_paths=Config.get('custom_layout_icon_paths', [])), widget.Clock(format="%a %d %b, %H:%M", **self.widget_defaults), - utils.CheckUpdatesYay( + kuro.utils.widgets.CheckUpdatesYay( colour_no_updates=Config.get('updates_colour_none', '#ffffff'), colour_have_updates=Config.get('updates_colour_available', '#ff0000'), display_format=Config.get('updates_display_format', 'Updates: {updates}'), @@ -456,26 +496,18 @@ class Kuro(BaseTheme): # Keys for the Wmii layout self.keys.extend([ - Key( - [self.mod, "shift", "control"], "l", - lazy.layout.grow_right() - ), - Key( - [self.mod, "shift"], "l", - lazy.layout.shuffle_right() - ), - Key( - [self.mod, "shift", "control"], "h", - lazy.layout.grow_left() - ), - Key( - [self.mod, "shift"], "h", - lazy.layout.shuffle_left() - ), - Key( - [self.mod], "s", - lazy.layout.toggle_split() - ) + Key([self.mod, "shift"], "j", lazy.layout.shuffle_down()), + Key([self.mod, "shift"], "k", lazy.layout.shuffle_up()), + Key([self.mod, "shift"], "h", lazy.layout.shuffle_left()), + Key([self.mod, "shift"], "l", lazy.layout.shuffle_right()), + + Key([self.mod, "shift", "control"], "j", lazy.layout.grow_down()), + Key([self.mod, "shift", "control"], "k", lazy.layout.grow_up()), + Key([self.mod, "shift", "control"], "h", lazy.layout.grow_left()), + Key([self.mod, "shift", "control"], "l", lazy.layout.grow_right()), + + Key([self.mod], "s", lazy.layout.toggle_split()), + Key([self.mod], "n", lazy.layout.normalize()), ]) # Util functions @@ -505,20 +537,68 @@ class Kuro(BaseTheme): elif laptop_screen is not None and len(screens) > 1: utils.execute("arandr") + def reinitialize_visualizers(self, qtile=None): + if Config.get("show_audio_visualizer", False): + logger.warning("Reinitializing visualizers...") + for screen in self.qtile.screens: + for widget in screen.top.widgets: + if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget): + if widget.client is not None: + widget.client.kill() + widget.client = None + widget.screen = None + self.update_visualizers(qtile=qtile) + + def update_visualizers(self, qtile=None): + if Config.get("show_audio_visualizer", False): + logger.warning("Updating visualizers..") + for screen in self.qtile.screens: + for widget in screen.top.widgets: + if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget): + if widget.client is None: + logger.warning("Spawning for screen {}".format(screen)) + utils.execute(Config.get('visualizer_app', "glava")) + else: + widget.update_graph() + + def show_window_info(self, qtile): + window = qtile.currentWindow if qtile else None + + import pprint + if window: + info = window.cmd_inspect() or None + name = window.name + + utils.notify(title="Window properties {}".format(name), + content="{}".format(pprint.pformat(vars(window)))) + + if info: + info = pprint.pformat(info) + utils.notify(title="Window info of {}".format(name), + content="{}".format(info)) + + # @staticmethod + def toggle_window_static(self, qtile): + window = qtile.currentWindow + if window in self.static_windows: + utils.notify("Unpinned {}".format(window.name), "{} has been unpinned".format(window.name)) + self.static_windows.remove(window) + del window.is_static_window + else: + utils.notify("Pinned {}".format(window.name), "{} has been pinned".format(window.name)) + self.static_windows.append(window) + window.is_static_window = True + + window.floating = True + # QTile base callbacks def callback_startup_once(self, *args, **kwargs): - pass - #Kuro.update_screens(self.qtile) - - def callback_startup(self): - utils.execute("sleep 3") - - # self.log_info("Restoring previous wallpaper...") - # utils.execute_once("nitrogen --restore") self.update_wallpaper(self.qtile) + def callback_startup(self): if self.current_wallpaper: - utils.execute_once(["wal", "-n", "-i", "{}".format(self.current_wallpaper)]) + p = utils.execute_once(["wal", "-n", "-i", "{}".format(self.current_wallpaper)]) + p.wait() else: wallpaper = None @@ -531,30 +611,29 @@ class Kuro(BaseTheme): if wallpaper: Kuro.set_wallpaper(self.qtile, wallpaper) else: - utils.execute_once("nitrogen --restore") + p = utils.execute_once("nitrogen --restore") + p.wait() - self.log_info("Starting compositor...") - utils.execute_once("compton -b") + # self.log_info("Starting compositor...") + # utils.execute_once("compton -b") # Update color scheme self.initialize_colorscheme() - # Update color scheme - Kuro.update_colorscheme(self.qtile) - - # display = os.environ['DISPLAY'] - # - # if not display: - # display = ":0" - # - # # Start compton for each screen - # for x in range(self.num_screens): - # self.log_info("Launching compton for screen {}.{}".format(display, x)) - # utils.execute_once("compton --config ~/.config/compton.conf -b -d {}.{}".format(display, x)) - # def callback_screen_change(self, *args, **kwargs): - # self.num_screens = utils.get_screen_count() - # return True + # for window in self.static_windows: + # window.togroup() + + def callback_setgroup(self, *args, **kwargs): + for window in self.static_windows: + # Only move if the window is not currently on any screen. + if window.group.screen is None: + try: + window.togroup() + except WindowError as e: + logger.warning("Could not move static window {}, removing from list: {}".format(window.name, e)) + del window.is_static_window + self.static_windows.remove(window) def callback_focus_change(self, *args, **kwargs): if self.do_keyboard_updates: @@ -599,6 +678,56 @@ class Kuro(BaseTheme): window.name)) self.log_info(str(self.qtile.dgroups.rules_map)) + # Check if it is a visualizer + if Config.get("show_audio_visualizer", False): + client = args[0] if len(args) > 0 else None + if client is not None and client.window.get_name() == "GLava": + placed = False + for screen in self.qtile.screens: + for widget in screen.top.widgets: + if not placed and isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget): + if widget.client is None: + viz_info = widget.info() + pos_x = viz_info['offset'] + widget.margin_x - 1 + pos_y = 0 + widget.margin_y - 1 + width = viz_info['width'] - (2 * (widget.margin_x - 1)) + height = viz_info['height'] - (2 * (widget.margin_y - 1)) + screen_index = self.qtile.screens.index(screen) + logger.warning("Attaching {} {} to {} on screen {}".format(client, client.window.wid, type(widget).__name__, screen_index)) + c = KuroStatic.create(client, screen, x=pos_x, y=pos_y, width=width, height=height) + c.setOpacity(Config.get("bar_opacity", 1.0)) + widget.set_client(c, screen) + placed = True + if not placed: + if Config.get("kill_unnecessary_glava_processes", False): + logger.warning("Killing GLava {} because there is no widget where it can fit".format(client)) + utils.notify("Glava", "Killing new GLava process because there is no screen without a visualizer") + client.kill() + else: + logger.warning("Not repositioning GLava {} because there is no widget where it can fit".format(client)) + utils.notify("Glava", "Not repisitioning new GLava process because there is no screen without a visualizer") + + def callback_client_killed(self, *args, **kwargs): + client = args[0] + logger.warning("Client {} Killed".format(client)) + + # Detach visualizer from widget if it was a visualizer window + if isinstance(client, KuroStatic): + for screen in self.qtile.screens: + for widget in screen.top.widgets: + if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget): + if widget.client == client: + screen_index = self.qtile.screens.index(screen) + logger.warning("Detaching {} {} from widget {} on screen {}".format(client, client.window.wid, type(widget).__name__, screen_index)) + widget.client = None + widget.screen = None + + # If this window was static, remove it from the static window list + if hasattr(client, "is_static_window") and client.is_static_window: + logger.warning("Removing static window {}".format(client.name)) + del client.is_static_window + self.static_windows.remove(client) + @staticmethod def update_wallpaper(qtile): wallpapers = [] @@ -617,7 +746,8 @@ class Kuro(BaseTheme): @staticmethod def set_wallpaper(qtile, filename): - utils.execute_once("wal-nitrogen {}".format(filename)) + p = utils.execute_once("wal-nitrogen-noupdate {}".format(filename)) + p.wait() qtile.theme_instance.current_wallpaper = filename Kuro.update_colorscheme(qtile) @@ -644,6 +774,7 @@ class Kuro(BaseTheme): Config.highlight = colors['color3'] Config.inactive_light = colors['color4'] Config.inactive_dark = colors['color5'] + Config.bar_background = colors['color1'] @staticmethod def update_colorscheme(qtile): @@ -651,7 +782,8 @@ class Kuro(BaseTheme): :type qtile: libqtile.manager.Qtile """ if qtile.theme_instance.current_wallpaper: - utils.execute(["wal", "-n", "-i", "{}".format(qtile.theme_instance.current_wallpaper)]) + p = utils.execute(["wal", "-n", "-i", "{}".format(qtile.theme_instance.current_wallpaper)]) + p.wait() colors = None if os.path.isfile("/home/kevin/.cache/wal/colors.json"): @@ -668,6 +800,7 @@ class Kuro(BaseTheme): Config.highlight = colors['color3'] Config.inactive_light = colors['color4'] Config.inactive_dark = colors['color5'] + Config.bar_background = colors['color1'] # Update border colors in layouts for group in qtile.groups: @@ -687,7 +820,7 @@ class Kuro(BaseTheme): try: w._update_drawer() except Exception as e: - logger.error("Error while updating drawer: {}".format(e)) + logger.error("Error while updating drawer for widget {}: {}".format(w, e)) if hasattr(w, 'foreground'): w.foreground = colors['color15'] @@ -722,12 +855,17 @@ class Kuro(BaseTheme): if hasattr(w, 'other_screen_border'): w.other_screen_border = colors['color8'] - if isinstance(w, utils.AudioVisualizerWidget): + if isinstance(w, kuro.utils.widgets.AudioVisualizerWidget): w.graph_color = colors['color15'] w.fill_color = colors['color8'] bar.draw() + # Update colors in visualizers and restart visualizers + with open(Config.get("glava_color_file_path", "~/.config/glava/kurobars_color.glsl"), 'w') as f: + f.write("#define COLOR {}\n#request setbg {}".format(colors['color15'], colors['color1'][1:])) + qtile.theme_instance.reinitialize_visualizers() + utils.notify( "Updated colorscheme!", "active: {}, inactive: {}".format(colors['color15'], colors['color1']) diff --git a/kuro/utils/general.py b/kuro/utils/general.py index 8ab30bb..1644348 100644 --- a/kuro/utils/general.py +++ b/kuro/utils/general.py @@ -1,30 +1,12 @@ -import os import re import subprocess -from asyncio import Queue -from threading import Thread from time import sleep -import cairocffi import notify2 -import numpy -import pyaudio import six -from libqtile import widget, bar -from libqtile.widget.currentlayout import CurrentLayoutIcon -from libqtile.widget.graph import _Graph +from libqtile import widget from libqtile.window import Internal from libqtile.bar import Bar -from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound -from libqtile.widget import base -from libqtile.widget.battery import default_icon_path -from libqtile.widget.check_updates import CheckUpdates -from libqtile.widget.image import Image -from libqtile.widget.sensors import ThermalSensor -from libqtile.widget.volume import Volume -from libqtile.widget.battery import BatteryIcon -from libqtile.widget.wlan import get_status -from libqtile.log_utils import logger from notify2 import Notification, URGENCY_NORMAL notify2.init("QTileWM") @@ -38,6 +20,8 @@ BUTTON_SCROLL_DOWN = 5 def is_running(process): s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE) + if isinstance(process, list): + process = "".join(process) for x in s.stdout: if re.search(process, x.decode('utf-8')): return True @@ -200,6 +184,26 @@ class KuroTopBar(Bar): self.window.handle_EnterNotify = self.handle_enter_notify self.window.handle_LeaveNotify = self.handle_leave_notify + def draw(self): + if self.queued_draws == 0: + self.qtile.call_soon(self._actual_draw) + self.queued_draws += 1 + + def _actual_draw(self): + self.queued_draws = 0 + self._resize(self.length, self.widgets) + for i in self.widgets: + i.draw() + if self.widgets: + end = i.offset + i.length + if end < self.length: + if self.horizontal: + self.drawer.draw(offsetx=end, width=self.length - end) + else: + self.drawer.draw(offsety=end, height=self.length - end) + + self.theme.update_visualizers() + def handle_enter_notify(self, e): # self.theme.log_debug("Bar HandleEnterNotify") # @@ -237,584 +241,3 @@ class KuroTopBar(Bar): self.draw() -class AppLauncherIcon(Image): - def button_press(self, x, y, button): - if button == BUTTON_LEFT: - execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'") - - def handle_hover(self, event): - spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!") - - -class CheckUpdatesYay(CheckUpdates): - def __init__(self, **config): - super(CheckUpdatesYay, self).__init__(**config) - # Override command and output with yay command - self.cmd = "yay -Qua".split() - self.status_cmd = "yay -Qua --color never".split() - self.update_cmd = "sudo yay".split() - self.subtr = 0 - - def _check_updates(self): - #subprocess.check_output(self.update_cmd) - res = super(CheckUpdatesYay, self)._check_updates() - return res - - def button_press(self, x, y, button): - if button == BUTTON_LEFT: - output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n') - - num_updates = len(output)-1 - msg = "{} updates available.".format(num_updates) - - if num_updates > 0: - msg += "\n\n" - for x in range(min(num_updates, 9)): - msg += output[x] + "\n" - if num_updates > 9: - msg += "and {} more...".format(num_updates-9) - - notify( - "System updates", - msg - ) - - elif button == BUTTON_MIDDLE and self.execute is not None: - subprocess.Popen(self.execute, shell=True) - - -class KuroBatteryIcon(BatteryIcon): - status_cmd = "acpi" - - def button_press(self, x, y, button): - if button == BUTTON_LEFT: - output = subprocess.check_output(self.status_cmd).decode('utf-8') - - notify( - "Battery Status", - output - ) - - -class PulseVolumeWidget(Volume): - - defaults = [ - ("cardid", None, "Card Id"), - ("device", "default", "Device Name"), - ("channel", "Master", "Channel"), - ("padding", 3, "Padding left and right. Calculated if None."), - ("theme_path", None, "Path of the icons"), - ("update_interval", 0.2, "Update time in seconds."), - ("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set." - "The specified font needs to contain the correct unicode characters."), - ("mute_command", None, "Mute command"), - ("volume_up_command", None, "Volume up command"), - ("volume_down_command", None, "Volume down command"), - ("get_volume_command", None, "Command to get the current volume"), - ("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"), - ] - - _old_length = 0 - - def __init__(self, **config): - super(PulseVolumeWidget, self).__init__(**config) - self._old_length = self._length - - # Augment commands with bluetooth sink ID if this is a bluetooth icon - if self.is_bluetooth_icon and bluetooth_audio_connected(): - bsink = bluetooth_audio_sink() - self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() - self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() - self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() - self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() - logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) - self._length = self._old_length - self.commands_need_reset = False - elif self.is_bluetooth_icon: - self.commands_need_reset = True - else: - self.commands_need_reset = False - - self._old_length = self._length - - def reset_bluetooth_commands(self): - if self.is_bluetooth_icon and bluetooth_audio_connected(): - bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink() - self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() - self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() - self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() - self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() - logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) - self._length = self._old_length - self.commands_need_reset = False - - def get_volume(self): - try: - get_volume_cmd = "echo 0".split() - - if self.get_volume_command: - if self.is_bluetooth_icon and bluetooth_audio_sink() == -1: - pass - else: - get_volume_cmd = self.get_volume_command - - mixer_out = self.call_process(get_volume_cmd) - except subprocess.CalledProcessError: - return -1 - - try: - return int(mixer_out.strip()) - except ValueError: - return -1 - - def _update_drawer(self): - super(PulseVolumeWidget, self)._update_drawer() - self.text = "" - if self.is_bluetooth_icon and not bluetooth_audio_connected(): - self._length = 0 - - def draw(self): - if self.is_bluetooth_icon and not bluetooth_audio_connected(): - if not self.commands_need_reset: - logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer") - self.commands_need_reset = True - base._TextBox.draw(self) - else: - if self.commands_need_reset: - self.reset_bluetooth_commands() - if self.theme_path: - self.drawer.draw(offsetx=self.offset, width=self.length) - else: - base._TextBox.draw(self) - - def button_press(self, x, y, button): - if button == BUTTON_LEFT: - volume = self.get_volume() - - width = 15 - - if volume >= 0: - volume_amount = round((volume/100)*width) - else: - volume_amount = 0 - - msg = "[{}{}]".format( - "".join(["#" for x in range(volume_amount)]), - "".join(["-" for x in range(width-volume_amount)]) - ) - - notify( - "{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume), - msg - ) - else: - super(PulseVolumeWidget, self).button_press(x, y, button) - - -class WifiIconWidget(base._TextBox): - """WiFi connection strength indicator widget.""" - - orientations = base.ORIENTATION_HORIZONTAL - defaults = [ - ('interface', 'wlan0', 'The interface to monitor'), - ('update_interval', 1, 'The update interval.'), - ('theme_path', default_icon_path(), 'Path of the icons'), - ('custom_icons', {}, 'dict containing key->filename icon map'), - ] - - def __init__(self, **config): - super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config) - self.add_defaults(WifiIconWidget.defaults) - - if self.theme_path: - self.length_type = bar.STATIC - self.length = 0 - self.surfaces = {} - self.current_icon = 'wireless-disconnected' - self.icons = dict([(x, '{0}.png'.format(x)) for x in ( - 'wireless-disconnected', - 'wireless-none', - 'wireless-low', - 'wireless-medium', - 'wireless-high', - 'wireless-full', - )]) - self.icons.update(self.custom_icons) - - def _get_info(self): - try: - essid, quality = get_status(self.interface) - disconnected = essid is None - if disconnected: - return self.disconnected_message - - return { - 'error': False, - 'essid': essid, - 'quality': quality, - 'percent': (quality / 70) - } - except EnvironmentError: - logger.error( - '%s: Probably your wlan device is switched off or ' - ' otherwise not present in your system.', - self.__class__.__name__) - - return {'error': True} - - def timer_setup(self): - self.update() - self.timeout_add(self.update_interval, self.timer_setup) - - def _configure(self, qtile, bar): - super(WifiIconWidget, self)._configure(qtile, bar) - self.setup_images() - - def _get_icon_key(self): - key = 'wireless' - info = self._get_info() - if info is False or info.get('error'): - key += '-none' - elif info.get('essid') is None: - key += '-disconnected' - else: - percent = info['percent'] - if percent < 0.2: - key += '-low' - elif percent < 0.4: - key += '-medium' - elif percent < 0.8: - key += '-high' - else: - key += '-full' - - return key - - def update(self): - icon = self._get_icon_key() - if icon != self.current_icon: - self.current_icon = icon - self.draw() - - def draw(self): - if self.theme_path: - self.drawer.clear(self.background or self.bar.background) - self.drawer.ctx.set_source(self.surfaces[self.current_icon]) - self.drawer.ctx.paint() - self.drawer.draw(offsetx=self.offset, width=self.length) - else: - self.text = self.current_icon[8:] - base._TextBox.draw(self) - - def setup_images(self): - for key, name in self.icons.items(): - try: - path = os.path.join(self.theme_path, name) - img = cairocffi.ImageSurface.create_from_png(path) - except cairocffi.Error: - self.theme_path = None - logger.warning('Wireless Icon switching to text mode') - return - input_width = img.get_width() - input_height = img.get_height() - - sp = input_height / (self.bar.height - 1) - - width = input_width / sp - if width > self.length: - # cast to `int` only after handling all potentially-float values - self.length = int(width + self.actual_padding * 2) - - imgpat = cairocffi.SurfacePattern(img) - - scaler = cairocffi.Matrix() - - scaler.scale(sp, sp) - scaler.translate(self.actual_padding * -1, 0) - imgpat.set_matrix(scaler) - - imgpat.set_filter(cairocffi.FILTER_BEST) - self.surfaces[key] = imgpat - - -class ThermalSensorWidget(ThermalSensor): - defaults = [ - ('metric', True, 'True to use metric/C, False to use imperial/F'), - ('show_tag', False, 'Show tag sensor'), - ('update_interval', 2, 'Update interval in seconds'), - ('tag_sensor', None, - 'Tag of the temperature sensor. For example: "temp1" or "Core 0"'), - ('chip', None, 'Chip argument for sensors command'), - ( - 'threshold', - 70, - 'If the current temperature value is above, ' - 'then change to foreground_alert colour' - ), - ('foreground_alert', 'ff0000', 'Foreground colour alert'), - ] - - @catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError) - def get_temp_sensors(self): - """calls the unix `sensors` command with `-f` flag if user has specified that - the output should be read in Fahrenheit. - """ - command = ["sensors", ] - if self.chip: - command.append(self.chip) - if not self.metric: - command.append("-f") - sensors_out = self.call_process(command) - return self._format_sensors_output(sensors_out) - - -class SeparatorWidget(base._TextBox): - def __init__(self): - super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14) - - -class MediaWidget(base.InLoopPollText): - """Media Status Widget""" - - class Status: - OFFLINE = 0 - PLAYING = 1 - PAUSED = 2 - STOPPED = 3 - - orientations = base.ORIENTATION_HORIZONTAL - defaults = [ - ('off_text', '', 'The pattern for the text if no players are found.'), - ('on_text_play', ' {}', 'The pattern for the text if music is playing.'), - ('on_text_pause', ' {}', 'The pattern for the text if music is paused.'), - ('on_text_stop', ' {}', 'The pattern for the text if music is stopped.'), - ('update_interval', 1, 'The update interval.'), - ] - - player_icons = { - 'spotify': '', - 'vlc': '', - 'firefox': '', - } - - custom_player_data = { - 'firefox': { - 'showing': False, - 'title': '', - 'state': Status.STOPPED, - } - } - - def __init__(self, **config): - super(MediaWidget, self).__init__(**config) - self.add_defaults(MediaWidget.defaults) - self.surfaces = {} - - def cmd_update_custom_player(self, player_name, data): - # Update firefox player - if player_name == "firefox": - if data['playing'] and data['muted']: - self.custom_player_data['firefox']['showing'] = True - self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED - self.custom_player_data['firefox']['title'] = data['title'] - elif data['playing'] and not data['muted']: - self.custom_player_data['firefox']['showing'] = True - self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING - self.custom_player_data['firefox']['title'] = data['title'] - elif not data['playing'] and data['muted']: - self.custom_player_data['firefox']['showing'] = True - self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED - self.custom_player_data['firefox']['title'] = data['title'] - elif not data['playing'] and not data['muted']: - self.custom_player_data['firefox']['showing'] = False - self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE - self.custom_player_data['firefox']['title'] = data['title'] - - def _get_players(self): - players = [] - - # Playerctl players - command = ["playerctl", "-l"] - result = self.call_process(command) - if result: - players.extend([x for x in result.split("\n") if x]) - - # Custom players - Firefox - if self.custom_player_data['firefox']['showing']: - players.append('firefox') - - if players: - return players - else: - return None - - def _get_info(self): - players = self._get_players() - - if not players: - return {} - else: - result = {} - - for player in players: - if player in self.custom_player_data.keys(): - # Custom player -- Firefox - if player == "firefox": - result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']] - - # Other custom players -- generic attempt with error catching - else: - try: - result[player] = [self.custom_player_data[player]['state'], - self.custom_player_data[player]['title']] - except KeyError: - pass - - else: - # PlayerCtl player - command = ["playerctl", "-p", player, "status"] - cmd_result = self.call_process(command).strip() - - text = "Unknown" - if cmd_result in ["Playing", "Paused"]: - artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip() - title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip() - - if artist and title: - text = "{} - {}".format(artist, title) - elif artist: - text = artist - elif title: - text = title - - if cmd_result == "Playing": - result[player] = [MediaWidget.Status.PLAYING, text] - elif cmd_result == "Paused": - result[player] = [MediaWidget.Status.PAUSED, text] - elif cmd_result == "Stopped": - result[player] = [MediaWidget.Status.STOPPED, ""] - - return result - - def _get_formatted_text(self, status): - if status[0] == MediaWidget.Status.PLAYING: - return self.on_text_play.format(status[1]) - elif status[0] == MediaWidget.Status.PAUSED: - return self.on_text_pause.format(status[1]) - elif status[0] == MediaWidget.Status.STOPPED: - return self.on_text_stop.format(status[1]) - else: - return "Unknown" - - def poll(self): - text = [] - status = self._get_info() - if not status: - return self.off_text - else: - for player in status.keys(): - icon = self.player_icons.get(player, player) - logger.warning([player, status[player]]) - text.append("{} {}".format(icon, self._get_formatted_text(status[player]))) - - return " | ".join(text) if text else self.off_text - - -class AudioVisualizerWidget(_Graph): - """Display Audio Visualization graph""" - orientations = base.ORIENTATION_HORIZONTAL - defaults = [ - ("audio_channel", "default", "Which audio channel to show"), - ] - - stream = None - - fixed_upper_bound = True - - def __init__(self, **config): - _Graph.__init__(self, **config) - self.add_defaults(AudioVisualizerWidget.defaults) - self.maxvalue = 100 - self.samples = 1024 - self.max_observed = 1 - - # initialize communication queue - self.q = Queue() - self.t = None - self.stream = None - self.tries = 0 - - def initialize_stream(self): - # initialize portaudio - p = pyaudio.PyAudio() - try: - self.stream = p.open(format=pyaudio.paInt16, channels=1, rate=44100, input=True, frames_per_buffer=self.samples) - - # initialize thread - self.t = Thread(target=self.process, args=[self, self.q]) - self.t.start() - except OSError as e: - logger.warning("Could not open audio stream: ".format(e)) - - self.tries += 1 - - @staticmethod - def process(widget: 'AudioVisualizerWidget', queue: Queue): - - item = queue.get() - - if widget.max_observed > 100: - widget.max_observed -= 100 - # Discard all available frames - avail = widget.stream.get_read_available() - while avail > 1000: - _ = widget.stream.read(avail) - logger.debug("Discarded {} frames".format(avail)) - avail = widget.stream.get_read_available() - - if avail > 100: - data = widget.stream.read(widget.samples) - numpydata = numpy.abs(numpy.fromstring(data, dtype=numpy.int16)) - - if numpy.max(numpydata) > widget.max_observed: - widget.max_observed = numpy.max(numpydata) - - numpydata = numpydata * (100 / widget.max_observed) - numpydata = AudioVisualizerWidget.window_rms(numpydata, 25) - - widget.values = list(numpydata) - print(widget.values) - else: - widget.values = [0]*1024 - - @staticmethod - def window_rms(a, window_size): - a2 = numpy.power(a, 2) - window = numpy.ones(window_size) / float(window_size) - return numpy.sqrt(numpy.convolve(a2, window, 'valid')) - - def update_graph(self): - if not self.stream and self.tries < 10: - self.initialize_stream() - - else: - if self.q.empty(): - self.q.put(True) - self.draw() - - -class KuroCurrentLayoutIcon(CurrentLayoutIcon): - def _get_layout_names(self): - names = super(KuroCurrentLayoutIcon, self)._get_layout_names() - - from kuro.utils import layouts as kuro_layouts - from libqtile.layout.base import Layout - klayouts = [ - layout_class_name.lower() - for layout_class, layout_class_name - in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts)) - if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout) - ] - names.extend(klayouts) - - return list(set(names)) diff --git a/kuro/utils/layouts.py b/kuro/utils/layouts.py index 7b34071..c57a3ad 100644 --- a/kuro/utils/layouts.py +++ b/kuro/utils/layouts.py @@ -1,51 +1,82 @@ -from libqtile.layout.wmii import Wmii +from libqtile.layout import Floating +from libqtile.layout.columns import Columns -class KuroWmii(Wmii): - def cmd_previous(self): - super(KuroWmii, self).cmd_previous() +class KuroWmii(Columns): + pass - def cmd_next(self): - super(KuroWmii, self).cmd_next() - def add(self, client): - """ - Add a new client window to the layout and focus it. It will be added to either the current column if there - are less rows in the current column than columns on the screen, or to a new row to the right of the current - column if there are less columns than rows in the current column. - :param client: The client window to add. - """ - self.clients.append(client) - c = self.current_column() - if c is None: - if len(self.columns) == 0: - self.columns = [{'active': 0, 'width': 100, 'mode': 'split', 'rows': []}] - c = self.columns[0] - c['rows'].append(client) +class KuroFloating(Floating): + defaults = [ + ("border_static", "#dddddd", "Border colour for static windows."), + ] + + def __init__(self, *args, **kwargs): + super(KuroFloating, self).__init__(*args, **kwargs) + self.add_defaults(KuroFloating.defaults) + + def configure(self, client, screen): + # 'sun-awt-X11-XWindowPeer' is a dropdown used in Java application, + # don't reposition it anywhere, let Java app to control it + cls = client.window.get_wm_class() or '' + is_java_dropdown = 'sun-awt-X11-XWindowPeer' in cls + if is_java_dropdown: + return + + if hasattr(client, "is_static_window") and client.is_static_window: + bc = client.group.qtile.colorPixel(self.border_static) + elif client.has_focus: + bc = client.group.qtile.colorPixel(self.border_focus) else: - num_cols = len(self.columns) - num_rows_curr_col = len(c['rows']) - if num_rows_curr_col < num_cols: - c['rows'].append(client) - else: - self.add_column_to_right(c, client) - self.focus(client) - - def add_column_to_right(self, column, win): - """ - Adds a new column to the right of the given column with the given window in it - :param column: The column that's going to be to the left of the new column - :param win: The window to add to the new column - """ - newwidth = int(100 / (len(self.columns) + 1)) - # we are only called if there already is a column, simplifies things - for c in self.columns: - c['width'] = newwidth - c = {'width': newwidth, 'mode': 'split', 'rows': [win]} + bc = client.group.qtile.colorPixel(self.border_normal) + if client.maximized: + bw = self.max_border_width + elif client.fullscreen: + bw = self.fullscreen_border_width + else: + bw = self.border_width + above = False + # We definitely have a screen here, so let's be sure we'll float on screen try: - index = self.columns.index(column) + 1 - except ValueError: - index = 0 + client.float_x + client.float_y + except AttributeError: + # this window hasn't been placed before, let's put it in a sensible spot + transient_for = client.window.get_wm_transient_for() + win = client.group.qtile.windowMap.get(transient_for) + if win is not None: + # if transient for a window, place in the center of the window + center_x = win.x + win.width / 2 + center_y = win.y + win.height / 2 + else: + center_x = screen.x + screen.width / 2 + center_y = screen.y + screen.height / 2 + above = True - self.columns.insert(index, c) + x = center_x - client.width / 2 + y = center_y - client.height / 2 + + # don't go off the right... + x = min(x, screen.x + screen.width) + # or left... + x = max(x, screen.x) + # or bottom... + y = min(y, screen.y + screen.height) + # or top + y = max(y, screen.y) + + if not (self.no_reposition_match and self.no_reposition_match.compare(client)): + client.x = int(round(x)) + client.y = int(round(y)) + + client.place( + client.x, + client.y, + client.width, + client.height, + bw, + bc, + above, + ) + client.unhide() diff --git a/kuro/utils/widgets.py b/kuro/utils/widgets.py new file mode 100644 index 0000000..2d918f2 --- /dev/null +++ b/kuro/utils/widgets.py @@ -0,0 +1,679 @@ +import os +import subprocess + +import cairocffi +import six +from libqtile import bar, pangocffi +from libqtile.log_utils import logger +from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound +from libqtile.widget import base +from libqtile.widget.battery import BatteryIcon, default_icon_path +from libqtile.widget.check_updates import CheckUpdates +from libqtile.widget.currentlayout import CurrentLayoutIcon +from libqtile.widget.graph import _Graph +from libqtile.widget.image import Image +from libqtile.widget.sensors import ThermalSensor +from libqtile.widget.tasklist import TaskList +from libqtile.widget.volume import Volume +from libqtile.widget.wlan import get_status +from libqtile.window import Window + +from kuro.utils.general import BUTTON_LEFT, execute, spawn_popup, notify, BUTTON_MIDDLE, bluetooth_audio_connected, \ + bluetooth_audio_sink, BUTTON_RIGHT + + +class AppLauncherIcon(Image): + def button_press(self, x, y, button): + if button == BUTTON_LEFT: + execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'") + + def handle_hover(self, event): + spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!") + + +class CheckUpdatesYay(CheckUpdates): + def __init__(self, **config): + super(CheckUpdatesYay, self).__init__(**config) + # Override command and output with yay command + self.cmd = "yay -Qua".split() + self.status_cmd = "yay -Qua --color never".split() + self.update_cmd = "sudo yay".split() + self.subtr = 0 + + def _check_updates(self): + #subprocess.check_output(self.update_cmd) + res = super(CheckUpdatesYay, self)._check_updates() + return res + + def button_press(self, x, y, button): + if button == BUTTON_LEFT: + output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n') + + num_updates = len(output)-1 + msg = "{} updates available.".format(num_updates) + + if num_updates > 0: + msg += "\n\n" + for x in range(min(num_updates, 9)): + msg += output[x] + "\n" + if num_updates > 9: + msg += "and {} more...".format(num_updates-9) + + notify( + "System updates", + msg + ) + + elif button == BUTTON_MIDDLE and self.execute is not None: + subprocess.Popen(self.execute, shell=True) + + +class KuroBatteryIcon(BatteryIcon): + status_cmd = "acpi" + + def button_press(self, x, y, button): + if button == BUTTON_LEFT: + output = subprocess.check_output(self.status_cmd).decode('utf-8') + + notify( + "Battery Status", + output + ) + + +class PulseVolumeWidget(Volume): + + defaults = [ + ("cardid", None, "Card Id"), + ("device", "default", "Device Name"), + ("channel", "Master", "Channel"), + ("padding", 3, "Padding left and right. Calculated if None."), + ("theme_path", None, "Path of the icons"), + ("update_interval", 0.2, "Update time in seconds."), + ("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set." + "The specified font needs to contain the correct unicode characters."), + ("mute_command", None, "Mute command"), + ("volume_up_command", None, "Volume up command"), + ("volume_down_command", None, "Volume down command"), + ("get_volume_command", None, "Command to get the current volume"), + ("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"), + ] + + _old_length = 0 + + def __init__(self, **config): + super(PulseVolumeWidget, self).__init__(**config) + self._old_length = self._length + + # Augment commands with bluetooth sink ID if this is a bluetooth icon + if self.is_bluetooth_icon and bluetooth_audio_connected(): + bsink = bluetooth_audio_sink() + self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() + self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() + self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() + self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() + logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) + self._length = self._old_length + self.commands_need_reset = False + elif self.is_bluetooth_icon: + self.commands_need_reset = True + else: + self.commands_need_reset = False + + self._old_length = self._length + + def reset_bluetooth_commands(self): + if self.is_bluetooth_icon and bluetooth_audio_connected(): + bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink() + self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() + self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() + self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() + self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() + logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) + self._length = self._old_length + self.commands_need_reset = False + + def get_volume(self): + try: + get_volume_cmd = "echo 0".split() + + if self.get_volume_command: + if self.is_bluetooth_icon and bluetooth_audio_sink() == -1: + pass + else: + get_volume_cmd = self.get_volume_command + + mixer_out = self.call_process(get_volume_cmd) + except subprocess.CalledProcessError: + return -1 + + try: + return int(mixer_out.strip()) + except ValueError: + return -1 + + def _update_drawer(self): + if self.volume is not None: + super(PulseVolumeWidget, self)._update_drawer() + self.text = "" + if self.is_bluetooth_icon and not bluetooth_audio_connected(): + self._length = 0 + + def draw(self): + if self.is_bluetooth_icon and not bluetooth_audio_connected(): + if not self.commands_need_reset: + logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer") + self.commands_need_reset = True + base._TextBox.draw(self) + else: + if self.commands_need_reset: + self.reset_bluetooth_commands() + if self.theme_path: + self.drawer.draw(offsetx=self.offset, width=self.length) + else: + base._TextBox.draw(self) + + def button_press(self, x, y, button): + if button == BUTTON_LEFT: + volume = self.get_volume() + + width = 15 + + if volume >= 0: + volume_amount = round((volume/100)*width) + else: + volume_amount = 0 + + msg = "[{}{}]".format( + "".join(["#" for x in range(volume_amount)]), + "".join(["-" for x in range(width-volume_amount)]) + ) + + notify( + "{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume), + msg + ) + else: + super(PulseVolumeWidget, self).button_press(x, y, button) + + +class WifiIconWidget(base._TextBox): + """WiFi connection strength indicator widget.""" + + orientations = base.ORIENTATION_HORIZONTAL + defaults = [ + ('interface', 'wlan0', 'The interface to monitor'), + ('update_interval', 1, 'The update interval.'), + ('theme_path', default_icon_path(), 'Path of the icons'), + ('custom_icons', {}, 'dict containing key->filename icon map'), + ] + + def __init__(self, **config): + super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config) + self.add_defaults(WifiIconWidget.defaults) + + if self.theme_path: + self.length_type = bar.STATIC + self.length = 0 + self.surfaces = {} + self.current_icon = 'wireless-disconnected' + self.icons = dict([(x, '{0}.png'.format(x)) for x in ( + 'wireless-disconnected', + 'wireless-none', + 'wireless-low', + 'wireless-medium', + 'wireless-high', + 'wireless-full', + )]) + self.icons.update(self.custom_icons) + + def _get_info(self): + try: + essid, quality = get_status(self.interface) + disconnected = essid is None + if disconnected: + return self.disconnected_message + + return { + 'error': False, + 'essid': essid, + 'quality': quality, + 'percent': (quality / 70) + } + except EnvironmentError: + logger.error( + '%s: Probably your wlan device is switched off or ' + ' otherwise not present in your system.', + self.__class__.__name__) + + return {'error': True} + + def timer_setup(self): + self.update() + self.timeout_add(self.update_interval, self.timer_setup) + + def _configure(self, qtile, bar): + super(WifiIconWidget, self)._configure(qtile, bar) + self.setup_images() + + def _get_icon_key(self): + key = 'wireless' + info = self._get_info() + if info is False or info.get('error'): + key += '-none' + elif info.get('essid') is None: + key += '-disconnected' + else: + percent = info['percent'] + if percent < 0.2: + key += '-low' + elif percent < 0.4: + key += '-medium' + elif percent < 0.8: + key += '-high' + else: + key += '-full' + + return key + + def update(self): + icon = self._get_icon_key() + if icon != self.current_icon: + self.current_icon = icon + self.draw() + + def draw(self): + if self.theme_path: + self.drawer.clear(self.background or self.bar.background) + self.drawer.ctx.set_source(self.surfaces[self.current_icon]) + self.drawer.ctx.paint() + self.drawer.draw(offsetx=self.offset, width=self.length) + else: + self.text = self.current_icon[8:] + base._TextBox.draw(self) + + def setup_images(self): + for key, name in self.icons.items(): + try: + path = os.path.join(self.theme_path, name) + img = cairocffi.ImageSurface.create_from_png(path) + except cairocffi.Error: + self.theme_path = None + logger.warning('Wireless Icon switching to text mode') + return + input_width = img.get_width() + input_height = img.get_height() + + sp = input_height / (self.bar.height - 1) + + width = input_width / sp + if width > self.length: + # cast to `int` only after handling all potentially-float values + self.length = int(width + self.actual_padding * 2) + + imgpat = cairocffi.SurfacePattern(img) + + scaler = cairocffi.Matrix() + + scaler.scale(sp, sp) + scaler.translate(self.actual_padding * -1, 0) + imgpat.set_matrix(scaler) + + imgpat.set_filter(cairocffi.FILTER_BEST) + self.surfaces[key] = imgpat + + +class ThermalSensorWidget(ThermalSensor): + defaults = [ + ('metric', True, 'True to use metric/C, False to use imperial/F'), + ('show_tag', False, 'Show tag sensor'), + ('update_interval', 2, 'Update interval in seconds'), + ('tag_sensor', None, + 'Tag of the temperature sensor. For example: "temp1" or "Core 0"'), + ('chip', None, 'Chip argument for sensors command'), + ( + 'threshold', + 70, + 'If the current temperature value is above, ' + 'then change to foreground_alert colour' + ), + ('foreground_alert', 'ff0000', 'Foreground colour alert'), + ] + + @catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError) + def get_temp_sensors(self): + """calls the unix `sensors` command with `-f` flag if user has specified that + the output should be read in Fahrenheit. + """ + command = ["sensors", ] + if self.chip: + command.append(self.chip) + if not self.metric: + command.append("-f") + sensors_out = self.call_process(command) + return self._format_sensors_output(sensors_out) + + +class SeparatorWidget(base._TextBox): + def __init__(self): + super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14) + + +class MediaWidget(base.InLoopPollText): + """Media Status Widget""" + + class Status: + OFFLINE = 0 + PLAYING = 1 + PAUSED = 2 + STOPPED = 3 + + orientations = base.ORIENTATION_HORIZONTAL + defaults = [ + ('off_text', '', 'The pattern for the text if no players are found.'), + ('on_text_play', ' {}', 'The pattern for the text if music is playing.'), + ('on_text_pause', ' {}', 'The pattern for the text if music is paused.'), + ('on_text_stop', ' {}', 'The pattern for the text if music is stopped.'), + ('update_interval', 1, 'The update interval.'), + ] + + player_icons = { + 'spotify': '', + 'vlc': '', + 'firefox': '', + } + + custom_player_data = { + 'firefox': { + 'showing': False, + 'title': '', + 'state': Status.STOPPED, + } + } + + image_urls = {} + current_image_url = None + player_to_control = None + + def __init__(self, **config): + super(MediaWidget, self).__init__(**config) + self.add_defaults(MediaWidget.defaults) + self.surfaces = {} + self.player_to_control = None + + def _player_to_control(self): + info = self._get_info() + players = {} + for player in info.keys(): + if player not in self.custom_player_data.keys(): + if info[player][0] in [MediaWidget.Status.PLAYING, MediaWidget.Status.PAUSED]: + players[player] = info[player] + + if self.player_to_control is not None and self.player_to_control not in players.keys(): + self.player_to_control = None + + if self.player_to_control is not None: + players = {self.player_to_control: players[self.player_to_control]} + + if len(players.keys()) == 1: + player = list(players.keys())[0] + self.player_to_control = player + return player + + elif len(players) == 0: + notify("MediaWidget", "Nothing to control!") + else: + notify("MediaWidget", "Multiple players to control, I don't know what you want to do!") + + return None + + def button_press(self, x, y, button): + if button == BUTTON_LEFT: + player = self._player_to_control() + if player is not None: + command = ["playerctl", "-p", player, "play-pause"] + _ = self.call_process(command) + notify("MediaWidget", "Toggled {}".format(player)) + if button == BUTTON_RIGHT: + player = self._player_to_control() + if player is not None: + command = ["playerctl", "-p", player, "next"] + _ = self.call_process(command) + + def cmd_update_custom_player(self, player_name, data): + # Update firefox player + if player_name == "firefox": + if data['playing'] and data['muted']: + self.custom_player_data['firefox']['showing'] = True + self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED + self.custom_player_data['firefox']['title'] = data['title'] + elif data['playing'] and not data['muted']: + self.custom_player_data['firefox']['showing'] = True + self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING + self.custom_player_data['firefox']['title'] = data['title'] + elif not data['playing'] and data['muted']: + self.custom_player_data['firefox']['showing'] = True + self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED + self.custom_player_data['firefox']['title'] = data['title'] + elif not data['playing'] and not data['muted']: + self.custom_player_data['firefox']['showing'] = False + self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE + self.custom_player_data['firefox']['title'] = data['title'] + + def _get_players(self): + players = [] + + # Playerctl players + command = ["playerctl", "-l"] + result = self.call_process(command) + if result: + players.extend([x for x in result.split("\n") if x]) + + # Custom players - Firefox + if self.custom_player_data['firefox']['showing']: + players.append('firefox') + + if players: + return players + else: + return None + + def _get_info(self): + players = self._get_players() + + if not players: + return {} + else: + result = {} + + for player in players: + if player in self.custom_player_data.keys(): + # Custom player -- Firefox + if player == "firefox": + result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']] + + # Other custom players -- generic attempt with error catching + else: + try: + result[player] = [self.custom_player_data[player]['state'], + self.custom_player_data[player]['title']] + except KeyError: + pass + + else: + # PlayerCtl player + command = ["playerctl", "-p", player, "status"] + cmd_result = self.call_process(command).strip() + + text = "Unknown" + if cmd_result in ["Playing", "Paused"]: + artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip() + title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip() + + if artist and title: + text = "{} - {}".format(artist, title) + elif artist: + text = artist + elif title: + text = title + + if cmd_result == "Playing": + result[player] = [MediaWidget.Status.PLAYING, text] + elif cmd_result == "Paused": + result[player] = [MediaWidget.Status.PAUSED, text] + elif cmd_result == "Stopped": + result[player] = [MediaWidget.Status.STOPPED, ""] + + return result + + def _get_formatted_text(self, status): + if status[0] == MediaWidget.Status.PLAYING: + return self.on_text_play.format(status[1]) + elif status[0] == MediaWidget.Status.PAUSED: + return self.on_text_pause.format(status[1]) + elif status[0] == MediaWidget.Status.STOPPED: + return self.on_text_stop.format(status[1]) + else: + return "Unknown" + + def draw(self): + super(MediaWidget, self).draw() + + def poll(self): + text = [] + status = self._get_info() + if not status: + return self.off_text + else: + for player in status.keys(): + icon = self.player_icons.get(player, player) + text.append("{} {}".format(icon, self._get_formatted_text(status[player]))) + + return " | ".join(text) if text else self.off_text + + +class AudioVisualizerWidget(_Graph): + """Display Audio Visualization graph""" + orientations = base.ORIENTATION_HORIZONTAL + fixed_upper_bound = True + + def __init__(self, **config): + _Graph.__init__(self, **config) + self.add_defaults(AudioVisualizerWidget.defaults) + + self.client = None + self.screen = None + + self.old_position = None + + def set_client(self, c, s): + self.client = c + self.screen = s + + def update_graph(self): + if self.client is not None: + viz_info = self.info() + pos_x = viz_info['offset'] + self.margin_x - 1 + self.screen.x + pos_y = 0 + self.margin_y - 1 + self.screen.y + if self.old_position != (pos_x, pos_y): + self.old_position = (pos_x, pos_y) + + # Check if a window on this screen is full-screen + fullscreen = False + for window in self.screen.group.windows: + if isinstance(window, Window): + if window.fullscreen: + fullscreen = True + break + + logger.warning("Repositioning {} {} to {}x{}".format(self.client, self.client.window.wid, pos_x, pos_y)) + self.client.reposition(pos_x, pos_y, above=not fullscreen) + + self.draw() + + +class KuroCurrentLayoutIcon(CurrentLayoutIcon): + def _get_layout_names(self): + names = super(KuroCurrentLayoutIcon, self)._get_layout_names() + + from kuro.utils import layouts as kuro_layouts + from libqtile.layout.base import Layout + klayouts = [ + layout_class_name.lower() + for layout_class, layout_class_name + in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts)) + if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout) + ] + names.extend(klayouts) + + return list(set(names)) + + +class KuroTaskList(TaskList): + defaults = [ + ( + 'txt_pinned', + 'P ', + 'Text representation of the pinned window state. ' + 'e.g., "P " or "\U0001F5D7 "' + ), + ( + 'markup_pinned', + None, + 'Text markup of the pinned window state. Supports pangomarkup with markup=True.' + 'e.g., "{}" or "{}"' + ), + ] + + def __init__(self, *args, **kwargs): + super(KuroTaskList, self).__init__(*args, **kwargs) + self.add_defaults(KuroTaskList.defaults) + + def get_taskname(self, window): + """ + Get display name for given window. + Depending on its state minimized, maximized and floating + appropriate characters are prepended. + """ + state = '' + markup_str = self.markup_normal + + # Enforce markup and new string format behaviour when + # at least one markup_* option is used. + # Mixing non markup and markup may cause problems. + if self.markup_minimized or self.markup_maximized\ + or self.markup_floating or self.markup_focused or self.markup_pinned: + enforce_markup = True + else: + enforce_markup = False + + if window is None: + pass + elif hasattr(window, "is_static_window") and window.is_static_window: + state = self.txt_pinned + markup_str = self.markup_pinned + elif window.minimized: + state = self.txt_minimized + markup_str = self.markup_minimized + elif window.maximized: + state = self.txt_maximized + markup_str = self.markup_maximized + elif window.floating: + state = self.txt_floating + markup_str = self.markup_floating + elif window is window.group.currentWindow: + markup_str = self.markup_focused + + window_name = window.name if window and window.name else "?" + + # Emulate default widget behavior if markup_str is None + if enforce_markup and markup_str is None: + markup_str = "%s{}" % (state) + + if markup_str is not None: + self.markup = True + window_name = pangocffi.markup_escape_text(window_name) + return markup_str.format(window_name) + + return "%s%s" % (state, window_name) + diff --git a/kuro/utils/windows.py b/kuro/utils/windows.py new file mode 100644 index 0000000..12a3bc6 --- /dev/null +++ b/kuro/utils/windows.py @@ -0,0 +1,69 @@ +from cairocffi.test_xcb import xcffib +from libqtile import hook +from libqtile.window import Window, Static + + +class KuroStatic(Static): + + @staticmethod + def create(window: Window, screen, x=None, y=None, width=None, height=None): + """Makes this window a static window, attached to a Screen + + If any of the arguments are left unspecified, the values given by the + window itself are used instead. So, for a window that's aware of its + appropriate size and location (like dzen), you don't have to specify + anything. + """ + window.defunct = True + if isinstance(screen, int): + screen = window.qtile.screens[screen] + if window.group: + window.group.remove(window) + s = KuroStatic(window.window, window.qtile, screen, x, y, width, height) + window.qtile.windowMap[window.window.wid] = s + hook.fire("client_managed", s) + return s + + def __init__(self, win, qtile, screen, x=None, y=None, width=None, height=None): + Static.__init__(self, win, qtile, screen, x=x, y=y, width=width, height=height) + self.above = True + self.placed_x = x + self.placed_y = y + if None not in (x, y, width, height): + self.place(x, y, width, height, 0, 0, above=self.above) + + def set_above(self, above: bool): + self.above = above + self.reposition(self.placed_x, self.placed_y) + + def reposition(self, x, y, above=None): + self.placed_x = x + self.placed_y = y + if above is not None: + self.above = above + self.place(x, y, self.width, self.height, 0, 0, above=self.above) + + def handle_ConfigureRequest(self, e): + cw = xcffib.xproto.ConfigWindow + if self.conf_x is None and e.value_mask & cw.X: + self.x = e.x + if self.conf_y is None and e.value_mask & cw.Y: + self.y = e.y + if self.conf_width is None and e.value_mask & cw.Width: + self.width = e.width + if self.conf_height is None and e.value_mask & cw.Height: + self.height = e.height + + self.place( + self.screen.x + self.x, + self.screen.y + self.y, + self.width, + self.height, + self.borderwidth, + self.bordercolor, + above=self.above + ) + return False + + def __repr__(self): + return "KuroStatic(%r)" % self.name