import os import re import subprocess import cairocffi import six from libqtile import bar, pangocffi from libqtile.log_utils import logger from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound from libqtile.widget import base from libqtile.widget.battery import BatteryIcon, default_icon_path from libqtile.widget.check_updates import CheckUpdates from libqtile.widget.currentlayout import CurrentLayoutIcon from libqtile.widget.graph import _Graph from libqtile.widget.image import Image from libqtile.widget.sensors import ThermalSensor from libqtile.widget.tasklist import TaskList from libqtile.widget.volume import Volume from libqtile.widget.wlan import get_status from libqtile.window import Window from kuro.utils.general import BUTTON_LEFT, execute, spawn_popup, notify, BUTTON_MIDDLE, bluetooth_audio_connected, \ bluetooth_audio_sink, BUTTON_RIGHT class AppLauncherIcon(Image): def button_press(self, x, y, button): if button == BUTTON_LEFT: execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'") def handle_hover(self, event): spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!") class CheckUpdatesYay(CheckUpdates): def __init__(self, **config): super(CheckUpdatesYay, self).__init__(**config) # Override command and output with yay command self.cmd = "yay -Qua".split() self.status_cmd = "yay -Qua --color never".split() self.update_cmd = "sudo yay".split() self.subtr = 0 def _check_updates(self): #subprocess.check_output(self.update_cmd) res = super(CheckUpdatesYay, self)._check_updates() return res def button_press(self, x, y, button): if button == BUTTON_LEFT: output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n') num_updates = len(output)-1 msg = "{} updates available.".format(num_updates) if num_updates > 0: msg += "\n\n" for x in range(min(num_updates, 9)): msg += output[x] + "\n" if num_updates > 9: msg += "and {} more...".format(num_updates-9) notify( "System updates", msg ) elif button == BUTTON_MIDDLE and self.execute is not None: subprocess.Popen(self.execute, shell=True) class KuroBatteryIcon(BatteryIcon): status_cmd = "acpi" def button_press(self, x, y, button): if button == BUTTON_LEFT: output = subprocess.check_output(self.status_cmd).decode('utf-8') notify( "Battery Status", output ) class PulseVolumeWidget(Volume): defaults = [ ("cardid", None, "Card Id"), ("device", "default", "Device Name"), ("channel", "Master", "Channel"), ("padding", 3, "Padding left and right. Calculated if None."), ("theme_path", None, "Path of the icons"), ("update_interval", 0.2, "Update time in seconds."), ("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set." "The specified font needs to contain the correct unicode characters."), ("mute_command", None, "Mute command"), ("volume_up_command", None, "Volume up command"), ("volume_down_command", None, "Volume down command"), ("get_volume_command", None, "Command to get the current volume"), ("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"), ] _old_length = 0 def __init__(self, **config): super(PulseVolumeWidget, self).__init__(**config) self._old_length = self._length # Augment commands with bluetooth sink ID if this is a bluetooth icon if self.is_bluetooth_icon and bluetooth_audio_connected(): bsink = bluetooth_audio_sink() self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) self._length = self._old_length self.commands_need_reset = False elif self.is_bluetooth_icon: self.commands_need_reset = True else: self.commands_need_reset = False self._old_length = self._length def reset_bluetooth_commands(self): if self.is_bluetooth_icon and bluetooth_audio_connected(): bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink() self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) self._length = self._old_length self.commands_need_reset = False def get_volume(self): try: get_volume_cmd = "echo 0".split() if self.get_volume_command: if self.is_bluetooth_icon and bluetooth_audio_sink() == -1: pass else: get_volume_cmd = self.get_volume_command mixer_out = self.call_process(get_volume_cmd) except subprocess.CalledProcessError: return -1 try: return int(mixer_out.strip()) except ValueError: return -1 def _update_drawer(self): if self.volume is not None: super(PulseVolumeWidget, self)._update_drawer() self.text = "" if self.is_bluetooth_icon and not bluetooth_audio_connected(): self._length = 0 def draw(self): if self.is_bluetooth_icon and not bluetooth_audio_connected(): if not self.commands_need_reset: logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer") self.commands_need_reset = True base._TextBox.draw(self) else: if self.commands_need_reset: self.reset_bluetooth_commands() if self.theme_path: self.drawer.draw(offsetx=self.offset, width=self.length) else: base._TextBox.draw(self) def button_press(self, x, y, button): if button == BUTTON_LEFT: volume = self.get_volume() width = 15 if volume >= 0: volume_amount = round((volume/100)*width) else: volume_amount = 0 msg = "[{}{}]".format( "".join(["#" for x in range(volume_amount)]), "".join(["-" for x in range(width-volume_amount)]) ) notify( "{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume), msg ) else: super(PulseVolumeWidget, self).button_press(x, y, button) class WifiIconWidget(base._TextBox): """WiFi connection strength indicator widget.""" orientations = base.ORIENTATION_HORIZONTAL defaults = [ ('interface', 'wlan0', 'The interface to monitor'), ('update_interval', 1, 'The update interval.'), ('theme_path', default_icon_path(), 'Path of the icons'), ('custom_icons', {}, 'dict containing key->filename icon map'), ('disconnected_message', {'error': False, 'essid': None, 'quality': 0, 'percent': 0}, 'Message to show when WiFi is disconnected'), ] def __init__(self, **config): super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config) self.add_defaults(WifiIconWidget.defaults) if self.theme_path: self.length_type = bar.STATIC self.length = 0 self.surfaces = {} self.current_icon = 'wireless-disconnected' self.icons = dict([(x, '{0}.png'.format(x)) for x in ( 'wireless-disconnected', 'wireless-none', 'wireless-low', 'wireless-medium', 'wireless-high', 'wireless-full', )]) self.icons.update(self.custom_icons) def _get_info(self): try: essid, quality = get_status(self.interface) disconnected = essid is None if disconnected: return self.disconnected_message return { 'error': False, 'essid': essid, 'quality': quality, 'percent': (quality / 70) } except EnvironmentError: logger.error( '%s: Probably your wlan device is switched off or ' ' otherwise not present in your system.', self.__class__.__name__) return {'error': True} def timer_setup(self): self.update() self.timeout_add(self.update_interval, self.timer_setup) def _configure(self, qtile, bar): super(WifiIconWidget, self)._configure(qtile, bar) self.setup_images() def _get_icon_key(self): key = 'wireless' info = self._get_info() if info is False or info.get('error'): key += '-none' elif info.get('essid') is None: key += '-disconnected' else: percent = info['percent'] if percent < 0.2: key += '-low' elif percent < 0.4: key += '-medium' elif percent < 0.8: key += '-high' else: key += '-full' return key def update(self): icon = self._get_icon_key() if icon != self.current_icon: self.current_icon = icon self.draw() def draw(self): if self.theme_path: self.drawer.clear(self.background or self.bar.background) self.drawer.ctx.set_source(self.surfaces[self.current_icon]) self.drawer.ctx.paint() self.drawer.draw(offsetx=self.offset, width=self.length) else: self.text = self.current_icon[8:] base._TextBox.draw(self) def setup_images(self): for key, name in self.icons.items(): try: path = os.path.join(self.theme_path, name) img = cairocffi.ImageSurface.create_from_png(path) except cairocffi.Error: self.theme_path = None logger.warning('Wireless Icon switching to text mode') return input_width = img.get_width() input_height = img.get_height() sp = input_height / (self.bar.height - 1) width = input_width / sp if width > self.length: # cast to `int` only after handling all potentially-float values self.length = int(width + self.actual_padding * 2) imgpat = cairocffi.SurfacePattern(img) scaler = cairocffi.Matrix() scaler.scale(sp, sp) scaler.translate(self.actual_padding * -1, 0) imgpat.set_matrix(scaler) imgpat.set_filter(cairocffi.FILTER_BEST) self.surfaces[key] = imgpat class ThermalSensorWidget(ThermalSensor): defaults = [ ('metric', True, 'True to use metric/C, False to use imperial/F'), ('show_tag', False, 'Show tag sensor'), ('update_interval', 2, 'Update interval in seconds'), ('tag_sensor', None, 'Tag of the temperature sensor. For example: "temp1" or "Core 0"'), ('chip', None, 'Chip argument for sensors command'), ( 'threshold', 70, 'If the current temperature value is above, ' 'then change to foreground_alert colour' ), ('foreground_alert', 'ff0000', 'Foreground colour alert'), ] @catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError) def get_temp_sensors(self): """calls the unix `sensors` command with `-f` flag if user has specified that the output should be read in Fahrenheit. """ command = ["sensors", ] if self.chip: command.append(self.chip) if not self.metric: command.append("-f") sensors_out = self.call_process(command) return self._format_sensors_output(sensors_out) class SeparatorWidget(base._TextBox): def __init__(self): super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14) class MediaWidget(base.InLoopPollText): """Media Status Widget""" class Status: OFFLINE = 0 PLAYING = 1 PAUSED = 2 STOPPED = 3 orientations = base.ORIENTATION_HORIZONTAL defaults = [ ('off_text', '', 'The pattern for the text if no players are found.'), ('on_text_play', ' {}', 'The pattern for the text if music is playing.'), ('on_text_pause', ' {}', 'The pattern for the text if music is paused.'), ('on_text_stop', ' {}', 'The pattern for the text if music is stopped.'), ('update_interval', 1, 'The update interval.'), ] player_icons = { 'spotify': '', 'vlc': '', 'firefox': '', 'mpv': '', } custom_player_data = { 'firefox': { 'showing': False, 'title': '', 'state': Status.STOPPED, } } image_urls = {} current_image_url = None player_to_control = None def __init__(self, **config): super(MediaWidget, self).__init__(**config) self.add_defaults(MediaWidget.defaults) self.surfaces = {} self.player_to_control = None def _player_to_control(self): info = self._get_info() players = {} for player in info.keys(): if player not in self.custom_player_data.keys(): if info[player][0] in [MediaWidget.Status.PLAYING, MediaWidget.Status.PAUSED]: players[player] = info[player] if self.player_to_control is not None and self.player_to_control not in players.keys(): self.player_to_control = None if self.player_to_control is not None: players = {self.player_to_control: players[self.player_to_control]} if len(players.keys()) == 1: player = list(players.keys())[0] self.player_to_control = player return player elif len(players) == 0: notify("MediaWidget", "Nothing to control!") else: notify("MediaWidget", "Multiple players to control, I don't know what you want to do!") return None def button_press(self, x, y, button): if button == BUTTON_LEFT: player = self._player_to_control() if player is not None: command = ["playerctl", "-p", player, "play-pause"] _ = self.call_process(command) notify("MediaWidget", "Toggled {}".format(player)) if button == BUTTON_RIGHT: player = self._player_to_control() if player is not None: command = ["playerctl", "-p", player, "next"] _ = self.call_process(command) def cmd_update_custom_player(self, player_name, data): # Update firefox player if player_name == "firefox": if data['playing'] and data['muted']: self.custom_player_data['firefox']['showing'] = True self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED self.custom_player_data['firefox']['title'] = data['title'] elif data['playing'] and not data['muted']: self.custom_player_data['firefox']['showing'] = True self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING self.custom_player_data['firefox']['title'] = data['title'] elif not data['playing'] and data['muted']: self.custom_player_data['firefox']['showing'] = True self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED self.custom_player_data['firefox']['title'] = data['title'] elif not data['playing'] and not data['muted']: self.custom_player_data['firefox']['showing'] = False self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE self.custom_player_data['firefox']['title'] = data['title'] def _get_players(self): players = [] # Playerctl players command = ["playerctl", "-l"] result = self.call_process(command) if result: players.extend([x for x in result.split("\n") if x]) # Custom players - Firefox if self.custom_player_data['firefox']['showing']: players.append('firefox') if players: return players else: return None def _get_info(self): players = self._get_players() if not players: return {} else: result = {} for player in players: if player in self.custom_player_data.keys(): # Custom player -- Firefox if player == "firefox": result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']] # Other custom players -- generic attempt with error catching else: try: result[player] = [self.custom_player_data[player]['state'], self.custom_player_data[player]['title']] except KeyError: pass else: # PlayerCtl player command = ["playerctl", "-p", player, "status"] cmd_result = self.call_process(command).strip() text = "Unknown" if cmd_result in ["Playing", "Paused"]: artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip() title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip() if artist and title: text = "{} - {}".format(artist, title) elif artist: text = artist elif title: text = title if cmd_result == "Playing": result[player] = [MediaWidget.Status.PLAYING, text] elif cmd_result == "Paused": result[player] = [MediaWidget.Status.PAUSED, text] elif cmd_result == "Stopped": result[player] = [MediaWidget.Status.STOPPED, ""] return result def _get_formatted_text(self, status): if status[0] == MediaWidget.Status.PLAYING: return self.on_text_play.format(status[1]) elif status[0] == MediaWidget.Status.PAUSED: return self.on_text_pause.format(status[1]) elif status[0] == MediaWidget.Status.STOPPED: return self.on_text_stop.format(status[1]) else: return "Unknown" def draw(self): super(MediaWidget, self).draw() def poll(self): text = [] status = self._get_info() if not status: return self.off_text else: for player in status.keys(): icon = self.player_icons.get(player, player) text.append("{} {}".format(icon, self._get_formatted_text(status[player]))) return " | ".join(text) if text else self.off_text class AudioVisualizerWidget(_Graph): """Display Audio Visualization graph""" orientations = base.ORIENTATION_HORIZONTAL fixed_upper_bound = True def __init__(self, **config): _Graph.__init__(self, **config) self.add_defaults(AudioVisualizerWidget.defaults) self.client = None self.screen = None self.old_position = None def set_client(self, c, s): self.client = c self.screen = s def update_graph(self): if self.client is not None: viz_info = self.info() pos_x = viz_info['offset'] + self.margin_x + self.screen.x pos_y = 0 + self.margin_y + self.screen.y if self.old_position != (pos_x, pos_y): self.old_position = (pos_x, pos_y) # Check if a window on this screen is full-screen fullscreen = False for window in self.screen.group.windows: if isinstance(window, Window): if window.fullscreen: fullscreen = True break logger.warning("Repositioning {} {} to {}x{}".format(self.client, self.client.window.wid, pos_x, pos_y)) self.client.reposition(pos_x, pos_y, above=not fullscreen) self.draw() class KuroCurrentLayoutIcon(CurrentLayoutIcon): def _get_layout_names(self): names = super(KuroCurrentLayoutIcon, self)._get_layout_names() from kuro.utils import layouts as kuro_layouts from libqtile.layout.base import Layout klayouts = [ layout_class_name.lower() for layout_class, layout_class_name in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts)) if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout) ] names.extend(klayouts) return list(set(names)) class KuroTaskList(TaskList): defaults = [ ( 'txt_pinned', 'P ', 'Text representation of the pinned window state. ' 'e.g., "P " or "\U0001F5D7 "' ), ( 'markup_pinned', None, 'Text markup of the pinned window state. Supports pangomarkup with markup=True.' 'e.g., "{}" or "{}"' ), ] def __init__(self, *args, **kwargs): super(KuroTaskList, self).__init__(*args, **kwargs) self.add_defaults(KuroTaskList.defaults) def get_taskname(self, window): """ Get display name for given window. Depending on its state minimized, maximized and floating appropriate characters are prepended. """ state = '' markup_str = self.markup_normal # Enforce markup and new string format behaviour when # at least one markup_* option is used. # Mixing non markup and markup may cause problems. if self.markup_minimized or self.markup_maximized\ or self.markup_floating or self.markup_focused or self.markup_pinned: enforce_markup = True else: enforce_markup = False if window is None: pass elif hasattr(window, "is_static_window") and window.is_static_window: state = self.txt_pinned markup_str = self.markup_pinned elif window.minimized: state = self.txt_minimized markup_str = self.markup_minimized elif window.maximized: state = self.txt_maximized markup_str = self.markup_maximized elif window.floating: state = self.txt_floating markup_str = self.markup_floating elif window is window.group.current_window: markup_str = self.markup_focused window_name = window.name if window and window.name else "?" # Emulate default widget behavior if markup_str is None if enforce_markup and markup_str is None: markup_str = "%s{}" % (state) if markup_str is not None: self.markup = True window_name = pangocffi.markup_escape_text(window_name) return markup_str.format(window_name) return "%s%s" % (state, window_name) class GPUStatusWidget(base._TextBox): """Displays the currently used GPU.""" orientations = base.ORIENTATION_HORIZONTAL defaults = [ ('check_command', 'optimus-manager --print-mode', 'The command that shows the current mode.'), ('update_interval', 60, 'The update interval in seconds.'), ('theme_path', default_icon_path(), 'Path of the icons'), ('custom_icons', {}, 'dict containing key->filename icon map'), ] def __init__(self, **config): super(GPUStatusWidget, self).__init__("GPU", bar.CALCULATED, **config) self.add_defaults(GPUStatusWidget.defaults) if self.theme_path: self.length_type = bar.STATIC self.length = 0 self.surfaces = {} self.current_icon = 'gpu-unknown' self.icons = dict([(x, '{0}.png'.format(x)) for x in ( 'gpu-intel', 'gpu-nvidia', 'gpu-unknown', )]) self.current_status = "Unknown" self.icons.update(self.custom_icons) def _get_info(self): output = self.call_process(self.check_command, shell=True) mode = "nvidia" if "nvidia" in output else "intel" if "intel" in output else "unknown" return {'error': False, 'mode': mode} def timer_setup(self): self.update() self.timeout_add(self.update_interval, self.timer_setup) def _configure(self, qtile, bar): super(GPUStatusWidget, self)._configure(qtile, bar) self.setup_images() def _get_icon_key(self): key = 'gpu' info = self._get_info() if info.get('mode') == "intel": key += '-intel' self.current_status = "Intel" elif info.get('mode') == "nvidia": key += '-nvidia' self.current_status = "NVidia" else: key += '-unknown' self.current_status = "Unknown" return key def update(self): icon = self._get_icon_key() if icon != self.current_icon: self.current_icon = icon self.draw() def draw(self): if self.theme_path: self.drawer.clear(self.background or self.bar.background) self.drawer.ctx.set_source(self.surfaces[self.current_icon]) self.drawer.ctx.paint() self.drawer.draw(offsetx=self.offset, width=self.length) else: self.text = self.current_icon[8:] base._TextBox.draw(self) def setup_images(self): for key, name in self.icons.items(): try: path = os.path.join(self.theme_path, name) img = cairocffi.ImageSurface.create_from_png(path) except cairocffi.Error: self.theme_path = None logger.warning('GPU Status Icon switching to text mode') return input_width = img.get_width() input_height = img.get_height() sp = input_height / (self.bar.height - 1) width = input_width / sp if width > self.length: # cast to `int` only after handling all potentially-float values self.length = int(width + self.actual_padding * 2) imgpat = cairocffi.SurfacePattern(img) scaler = cairocffi.Matrix() scaler.scale(sp, sp) scaler.translate(self.actual_padding * -1, 0) imgpat.set_matrix(scaler) imgpat.set_filter(cairocffi.FILTER_BEST) self.surfaces[key] = imgpat def button_press(self, x, y, button): if button == BUTTON_LEFT: if self.current_status == "Unknown": notify("GPU Status", "The currently used GPU is unknown.", image=os.path.join(self.theme_path, "gpu-unknown.png")) else: notify("GPU Status", "The system is currently running on the {} GPU.\n" "Press the middle mouse button on this icon to switch GPUs.".format( self.current_status ), image=os.path.join(self.theme_path, "gpu-{}.png".format(self.current_status.lower())) ) if button == BUTTON_MIDDLE: command = ["optimus-manager", "--no-confirm", "--switch", "auto"] output = self.call_process(command) if "nvidia" in output: notify("GPU Switched", "The GPU has been switched from Intel to NVidia.\n" "Please log out and log back in to apply the changes to the session.", image=os.path.join(self.theme_path, "gpu-nvidia.png")) elif "intel" in output: notify("GPU Switched", "The GPU has been switched from NVidia to Intel.\n" "Please log out and log back in to apply the changes to the session.", image=os.path.join(self.theme_path, "gpu-intel.png")) else: notify("GPU Switch Error", "I could not determine if the GPU was switched successfully.\n" "Please log out and log back in to clear up the inconsistency.", image=os.path.join(self.theme_path, "gpu-unknown.png"))