import os import re import subprocess from time import sleep import cairocffi import notify2 from libqtile import widget, bar from libqtile.window import Internal from libqtile.bar import Bar from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound from libqtile.widget import base from libqtile.widget.battery import default_icon_path from libqtile.widget.check_updates import CheckUpdates from libqtile.widget.image import Image from libqtile.widget.sensors import ThermalSensor from libqtile.widget.volume import Volume from libqtile.widget.battery import BatteryIcon from libqtile.widget.wlan import get_status from libqtile.log_utils import logger from notify2 import Notification, URGENCY_NORMAL notify2.init("QTileWM") BUTTON_LEFT = 1 BUTTON_MIDDLE = 2 BUTTON_RIGHT = 3 BUTTON_SCROLL_UP = 4 BUTTON_SCROLL_DOWN = 5 def is_running(process): s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE) for x in s.stdout: if re.search(process, x.decode('utf-8')): return True return False def execute(process): return subprocess.Popen(process.split()) def execute_once(process): if not is_running(process): return subprocess.Popen(process.split()) def get_screen_count(): try: output = subprocess.check_output("xrandr -q".split()).decode('utf-8') output = [x for x in output.split("\n") if " connected" in x] except subprocess.CalledProcessError: return 1 if output: return len(output) else: return 1 def bar_separator(config): return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'), linewidth=config.get('width_spacer', 1), padding=config.get('padding_spacer', 4), ) def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None): if image is not None: notification = Notification( summary=title, message=content, icon=image ) else: notification = Notification( summary=title, message=content ) notification.set_timeout(timeout) notification.set_urgency(urgency) return notification.show() def spawn_popup(qtile, x, y, text): """ :param qtile: The main qtile instance :type qtile: Qtile :param x: x-coordinate :type x: int :param y: y-coordinate :type y: int :param text: String to display :type text: str :return: The popup instance :rtype: Internal """ popup = Internal.create( qtile, x, y, 100, 100, opacity=1 ) # Create textwidget for in window popup.bordercolor = "#000000" popup.borderwidth = 1 popup.focus(False) #popup. return popup def despawn_popup(popup): """ :type popup: Internal :param popup: The popup to despawn """ popup.kill() def test_popups(qtile): popup = spawn_popup(qtile, 10, 10, "Hello World!") sleep(3) despawn_popup(popup) def display_wm_class(qtile): window = qtile.currentWindow if qtile else None if window: wm_class = window.window.get_wm_class() or None name = window.name if wm_class: notify(title="WM_Class of {}".format(name), content="{}".format(wm_class), urgency=notify2.URGENCY_CRITICAL) def bluetooth_audio_sink(): try: output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8") output = [x for x in output.split('\n') if "blue" in x.lower()] except subprocess.CalledProcessError: return -1 sink = -1 try: sink = int(output[0].split()[0]) except IndexError: pass except AttributeError: pass except ValueError: pass return sink def bluetooth_audio_connected(): return bluetooth_audio_sink() != -1 class KuroTopBar(Bar): def __init__(self, theme, widgets, size, **config): self.theme = theme super(KuroTopBar, self).__init__(widgets, size, **config) def _configure(self, qtile, screen): super(KuroTopBar, self)._configure(qtile, screen) self.window.handle_EnterNotify = self.handle_enter_notify self.window.handle_LeaveNotify = self.handle_leave_notify def handle_enter_notify(self, e): # self.theme.log_debug("Bar HandleEnterNotify") # # self.window.opacity = Config.get('bar_hover_opacity', 1.0) # print("Bar Hover Enter") # # try: # hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0] # except IndexError: # hovered_widget = None # # self.theme.log_debug("Hovered over {}".format(hovered_widget)) # # if hasattr(hovered_widget, "handle_hover_enter"): # hovered_widget.handle_hover_enter(e) self.draw() def handle_leave_notify(self, e): # self.theme.log_debug("Bar HandleLeaveNotify") # # self.window.opacity = Config.get('bar_opacity', 1.0) # print("Bar Hover Leave") # # try: # hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0] # except IndexError: # hovered_widget = None # # self.theme.log_debug("Hovered over {}".format(hovered_widget)) # # if hasattr(hovered_widget, "handle_hover_leave"): # hovered_widget.handle_hover_leave(e) self.draw() class AppLauncherIcon(Image): def button_press(self, x, y, button): if button == BUTTON_LEFT: execute("dmenu_run -i -p 'ยป' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'") def handle_hover(self, event): spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!") class CheckUpdatesYaourt(CheckUpdates): def __init__(self, **config): super(CheckUpdatesYaourt, self).__init__(**config) # Override command and output with yaourt command self.cmd = "yaourt -Qua".split() self.status_cmd = "yaourt -Qua".split() self.update_cmd = "sudo yaourt -Sya".split() self.subtr = 0 def _check_updates(self): #subprocess.check_output(self.update_cmd) res = super(CheckUpdatesYaourt, self)._check_updates() return res def button_press(self, x, y, button): if button == BUTTON_LEFT: output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n') num_updates = len(output)-1 msg = "{} updates available.".format(num_updates) if num_updates > 0: msg += "\n\n" for x in range(min(num_updates, 9)): msg += output[x] + "\n" if num_updates > 9: msg += "and {} more...".format(num_updates-9) notify( "System updates", msg ) elif button == BUTTON_MIDDLE and self.execute is not None: subprocess.Popen(self.execute, shell=True) class KuroBatteryIcon(BatteryIcon): status_cmd = "acpi" def button_press(self, x, y, button): if button == BUTTON_LEFT: output = subprocess.check_output(self.status_cmd).decode('utf-8') notify( "Battery Status", output ) class PulseVolumeWidget(Volume): defaults = [ ("cardid", None, "Card Id"), ("device", "default", "Device Name"), ("channel", "Master", "Channel"), ("padding", 3, "Padding left and right. Calculated if None."), ("theme_path", None, "Path of the icons"), ("update_interval", 0.2, "Update time in seconds."), ("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set." "The specified font needs to contain the correct unicode characters."), ("mute_command", None, "Mute command"), ("volume_up_command", None, "Volume up command"), ("volume_down_command", None, "Volume down command"), ("get_volume_command", None, "Command to get the current volume"), ("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"), ] _old_length = 0 def __init__(self, **config): super(PulseVolumeWidget, self).__init__(**config) self._old_length = self._length # Augment commands with bluetooth sink ID if this is a bluetooth icon if self.is_bluetooth_icon and bluetooth_audio_connected(): bsink = bluetooth_audio_sink() self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) self._length = self._old_length self.commands_need_reset = False elif self.is_bluetooth_icon: self.commands_need_reset = True else: self.commands_need_reset = False self._old_length = self._length def reset_bluetooth_commands(self): if self.is_bluetooth_icon and bluetooth_audio_connected(): bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink() self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split() self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split() self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split() self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split() logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink)) self._length = self._old_length self.commands_need_reset = False def get_volume(self): try: get_volume_cmd = "echo 0".split() if self.get_volume_command: if self.is_bluetooth_icon and bluetooth_audio_sink() == -1: pass else: get_volume_cmd = self.get_volume_command mixer_out = self.call_process(get_volume_cmd) except subprocess.CalledProcessError: return -1 try: return int(mixer_out.strip()) except ValueError: return -1 def _update_drawer(self): super(PulseVolumeWidget, self)._update_drawer() self.text = "" if self.is_bluetooth_icon and not bluetooth_audio_connected(): self._length = 0 def draw(self): if self.is_bluetooth_icon and not bluetooth_audio_connected(): if not self.commands_need_reset: logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer") self.commands_need_reset = True base._TextBox.draw(self) else: if self.commands_need_reset: self.reset_bluetooth_commands() if self.theme_path: self.drawer.draw(offsetx=self.offset, width=self.length) else: base._TextBox.draw(self) def button_press(self, x, y, button): if button == BUTTON_LEFT: volume = self.get_volume() width = 15 if volume >= 0: volume_amount = round((volume/100)*width) else: volume_amount = 0 msg = "[{}{}]".format( "".join(["#" for x in range(volume_amount)]), "".join(["-" for x in range(width-volume_amount)]) ) notify( "{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume), msg ) else: super(PulseVolumeWidget, self).button_press(x, y, button) class WifiIconWidget(base._TextBox): """WiFi connection strength indicator widget.""" orientations = base.ORIENTATION_HORIZONTAL defaults = [ ('interface', 'wlan0', 'The interface to monitor'), ('update_interval', 1, 'The update interval.'), ('theme_path', default_icon_path(), 'Path of the icons'), ('custom_icons', {}, 'dict containing key->filename icon map'), ] def __init__(self, **config): super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config) self.add_defaults(WifiIconWidget.defaults) if self.theme_path: self.length_type = bar.STATIC self.length = 0 self.surfaces = {} self.current_icon = 'wireless-disconnected' self.icons = dict([(x, '{0}.png'.format(x)) for x in ( 'wireless-disconnected', 'wireless-none', 'wireless-low', 'wireless-medium', 'wireless-high', 'wireless-full', )]) self.icons.update(self.custom_icons) def _get_info(self): try: essid, quality = get_status(self.interface) disconnected = essid is None if disconnected: return self.disconnected_message return { 'error': False, 'essid': essid, 'quality': quality, 'percent': (quality / 70) } except EnvironmentError: logger.error( '%s: Probably your wlan device is switched off or ' ' otherwise not present in your system.', self.__class__.__name__) return {'error': True} def timer_setup(self): self.update() self.timeout_add(self.update_interval, self.timer_setup) def _configure(self, qtile, bar): super(WifiIconWidget, self)._configure(qtile, bar) self.setup_images() def _get_icon_key(self): key = 'wireless' info = self._get_info() if info is False or info.get('error'): key += '-none' elif info.get('essid') is None: key += '-disconnected' else: percent = info['percent'] if percent < 0.2: key += '-low' elif percent < 0.4: key += '-medium' elif percent < 0.8: key += '-high' else: key += '-full' return key def update(self): icon = self._get_icon_key() if icon != self.current_icon: self.current_icon = icon self.draw() def draw(self): if self.theme_path: self.drawer.clear(self.background or self.bar.background) self.drawer.ctx.set_source(self.surfaces[self.current_icon]) self.drawer.ctx.paint() self.drawer.draw(offsetx=self.offset, width=self.length) else: self.text = self.current_icon[8:] base._TextBox.draw(self) def setup_images(self): for key, name in self.icons.items(): try: path = os.path.join(self.theme_path, name) img = cairocffi.ImageSurface.create_from_png(path) except cairocffi.Error: self.theme_path = None logger.warning('Wireless Icon switching to text mode') return input_width = img.get_width() input_height = img.get_height() sp = input_height / (self.bar.height - 1) width = input_width / sp if width > self.length: # cast to `int` only after handling all potentially-float values self.length = int(width + self.actual_padding * 2) imgpat = cairocffi.SurfacePattern(img) scaler = cairocffi.Matrix() scaler.scale(sp, sp) scaler.translate(self.actual_padding * -1, 0) imgpat.set_matrix(scaler) imgpat.set_filter(cairocffi.FILTER_BEST) self.surfaces[key] = imgpat class ThermalSensorWidget(ThermalSensor): defaults = [ ('metric', True, 'True to use metric/C, False to use imperial/F'), ('show_tag', False, 'Show tag sensor'), ('update_interval', 2, 'Update interval in seconds'), ('tag_sensor', None, 'Tag of the temperature sensor. For example: "temp1" or "Core 0"'), ('chip', None, 'Chip argument for sensors command'), ( 'threshold', 70, 'If the current temperature value is above, ' 'then change to foreground_alert colour' ), ('foreground_alert', 'ff0000', 'Foreground colour alert'), ] @catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError) def get_temp_sensors(self): """calls the unix `sensors` command with `-f` flag if user has specified that the output should be read in Fahrenheit. """ command = ["sensors", ] if self.chip: command.append(self.chip) if not self.metric: command.append("-f") sensors_out = self.call_process(command) return self._format_sensors_output(sensors_out)