Multiple changes

This commit is contained in:
Kevin Alberts 2019-03-13 21:12:18 +01:00
parent 5b7475e50f
commit 8c070c86a5
Signed by: Kurocon
GPG key ID: BCD496FEBA0C6BC1
7 changed files with 1052 additions and 709 deletions

View file

@ -23,12 +23,14 @@ class Config(BaseConfig):
cmd_brightness_up = "sudo /usr/bin/xbacklight -inc 10"
cmd_brightness_down = "sudo /usr/bin/xbacklight -dec 10"
lock_command = "/home/kevin/bin/lock.sh"
visualizer_app = "glava"
# Images
desktop_bg = "/home/kevin/Pictures/wallpapers/desktop.png"
desktop_bg_folder = "/home/kevin/Pictures/wallpapers/desktop_rotation"
applauncher_image = "/home/kevin/.config/qtile/kuro/resources/arch.png"
custom_layout_icon_paths = ['/home/kevin/.config/qtile/kuro/resources/layout_icons/']
glava_color_file_path = "/home/kevin/.config/glava/kurobars_color.glsl"
# Fonts
font_default = "Noto Sans"
@ -138,4 +140,5 @@ class Config(BaseConfig):
# Show audio visualizer
show_audio_visualizer = True
kill_unnecessary_glava_processes = True

Binary file not shown.

After

Width:  |  Height:  |  Size: 266 B

View file

@ -7,6 +7,9 @@ from libqtile.command import lazy
from libqtile import layout, bar, widget
# Import theme util functions
from xcffib.xproto import WindowError
import kuro.utils.widgets
from kuro.utils import general as utils
# Import variables
@ -14,6 +17,7 @@ from kuro.base import BaseTheme
from kuro.utils.general import display_wm_class, test_popups
from kuro.utils.kb_backlight import handle_focus_change as kb_handle_focus_change
from kuro.utils import layouts as kuro_layouts
from kuro.utils.windows import KuroStatic
try:
from kuro.config import Config
@ -43,6 +47,12 @@ class Kuro(BaseTheme):
# Top bars
topbars = []
# Visualizers
audio_visualizers = []
# Static windows
static_windows = []
# Current wallpaper path
current_wallpaper = None
@ -52,6 +62,24 @@ class Kuro(BaseTheme):
# Window manager name
wmname = "QTile"
# Floating layout override
floating_layout = kuro_layouts.KuroFloating(float_rules=[
{'wmclass': 'confirm'},
{'wmclass': 'dialog'},
{'wmclass': 'download'},
{'wmclass': 'error'},
{'wmclass': 'file_progress'},
{'wmclass': 'notification'},
{'wmclass': 'splash'},
{'wmclass': 'toolbar'},
{'wmclass': 'confirmreset'}, # gitk
{'wmclass': 'makebranch'}, # gitk
{'wmclass': 'maketag'}, # gitk
{'wname': 'branchdialog'}, # gitk
{'wname': 'pinentry'}, # GPG key password entry
{'wmclass': 'ssh-askpass'}, # ssh-askpass
])
def set_debug_text(self, text):
for field in self.debug_textfields:
field.text = text
@ -157,6 +185,9 @@ class Kuro(BaseTheme):
# Reorganize screens
Key([self.mod, "control"], "s", lazy.function(self.update_screens)),
# Toggle static windows
Key([self.mod], "p", lazy.function(self.toggle_window_static)),
##
# Debug keyboard shortcuts
@ -166,6 +197,12 @@ class Kuro(BaseTheme):
# Redraw the top bar
Key([self.mod, "shift", "control"], "r", lazy.function(self.redraw_bar)),
# Update visualizer widgets
Key([self.mod, "shift", "control"], "v", lazy.function(self.reinitialize_visualizers)),
# Show extensive window info
Key([self.mod, "shift", "control"], "i", lazy.function(self.show_window_info)),
# Spawn a popup, and despawn it after 3 seconds
Key([self.mod, "control"], "p", lazy.function(test_popups)),
]
@ -179,7 +216,7 @@ class Kuro(BaseTheme):
groups.append(Group("", spawn=Config.get('web_browser', "xterm links")))
groups.append(Group("", spawn=Config.get('app_terminal', "xterm")))
groups.append(Group(""))
groups.append(Group("", spawn="franz4-bin"))
groups.append(Group("", spawn="franz"))
groups.append(Group("", spawn="quasselclient"))
groups.append(Group("", spawn=Config.get('file_manager', "thunar")))
groups.append(Group("", spawn="thunderbird"))
@ -194,6 +231,7 @@ class Kuro(BaseTheme):
return [
kuro_layouts.KuroWmii(
theme=self,
border_focus=Config.get('colour_border_focus', "#ffffff"),
border_focus_stack=Config.get('colour_border_normal', "#777777"),
border_normal=Config.get('colour_border_normal', "#777777"),
@ -222,7 +260,7 @@ class Kuro(BaseTheme):
self.log_debug("Initializing screens")
self.num_screens = utils.get_screen_count()
if self.num_screens == 0:
if self.num_screens <= 0:
self.num_screens = 1
screens = []
@ -245,7 +283,7 @@ class Kuro(BaseTheme):
),
widget.Prompt(**self.widget_defaults),
widget.TaskList(
kuro.utils.widgets.KuroTaskList(
border=Config.get('tasklist_border', '#ffffff'),
borderwidth=Config.get('tasklist_borderwidth', 1),
font=Config.get('tasklist_font', 'Arial'),
@ -260,21 +298,23 @@ class Kuro(BaseTheme):
])
if Config.get('show_audio_visualizer', False):
widgets.append(utils.AudioVisualizerWidget(
widgets.append(kuro.utils.widgets.AudioVisualizerWidget(
graph_color=Config.get('visualizer_graph_color', "#ffffff"),
fill_color=Config.get('visualizer_fill_color', "#ffffff.3"),
border_color=Config.get('visualizer_border_color', "#000000"),
border_width=Config.get('visualizer_graph_width', 0),
line_width=Config.get('visualizer_line_width', 1),
frequency=0.05
margin_x=1,
margin_y=1,
frequency=1
))
widgets.extend([
utils.MediaWidget(),
kuro.utils.widgets.MediaWidget(),
utils.SeparatorWidget(),
kuro.utils.widgets.SeparatorWidget(),
utils.ThermalSensorWidget(
kuro.utils.widgets.ThermalSensorWidget(
font=Config.get('font_topbar', 'Arial'),
fontsize=Config.get('fontsize_topbar', 16),
foreground=Config.get('thermal_colour', '#ffffff'),
@ -325,7 +365,7 @@ class Kuro(BaseTheme):
frequency=2,
),
utils.KuroBatteryIcon(
kuro.utils.widgets.KuroBatteryIcon(
battery_name=Config.get('battery_name', 'BAT0'),
energy_full_file=Config.get('battery_energy_full_file', 'charge_full'),
energy_now_file=Config.get('battery_energy_now_file', 'charge_now'),
@ -334,14 +374,14 @@ class Kuro(BaseTheme):
update_delay=Config.get('battery_update_delay', 30)
),
utils.WifiIconWidget(
kuro.utils.widgets.WifiIconWidget(
interface=Config.get('wifi_interface', 'wlp4s0'),
theme_path=Config.get('wifi_theme_path', '/home/docs/checkouts/readthedocs.org/user_builds/qtile'
'/checkouts/latest/libqtile/resources/battery-icons'),
update_interval=Config.get('wifi_update_interval', 30)
),
utils.PulseVolumeWidget(
kuro.utils.widgets.PulseVolumeWidget(
cardid=Config.get('volume_cardid', None),
channel=Config.get('volume_channel', 'Master'),
device=Config.get('volume_device', None),
@ -358,7 +398,7 @@ class Kuro(BaseTheme):
update_interval=Config.get('volume_update_interval', 0.2)
),
utils.PulseVolumeWidget(
kuro.utils.widgets.PulseVolumeWidget(
cardid=Config.get('bluevol_cardid', None),
channel=Config.get('bluevol_channel', 'Master'),
device=Config.get('bluevol_device', None),
@ -376,14 +416,14 @@ class Kuro(BaseTheme):
)
])
# Systray only on first screen
# Systray can only be on one screen, so put it on the first
if x == 0:
widgets.append(widget.Systray(**self.widget_defaults))
widgets.extend([
utils.KuroCurrentLayoutIcon(custom_icon_paths=Config.get('custom_layout_icon_paths', [])),
kuro.utils.widgets.KuroCurrentLayoutIcon(custom_icon_paths=Config.get('custom_layout_icon_paths', [])),
widget.Clock(format="%a %d %b, %H:%M", **self.widget_defaults),
utils.CheckUpdatesYay(
kuro.utils.widgets.CheckUpdatesYay(
colour_no_updates=Config.get('updates_colour_none', '#ffffff'),
colour_have_updates=Config.get('updates_colour_available', '#ff0000'),
display_format=Config.get('updates_display_format', 'Updates: {updates}'),
@ -456,26 +496,18 @@ class Kuro(BaseTheme):
# Keys for the Wmii layout
self.keys.extend([
Key(
[self.mod, "shift", "control"], "l",
lazy.layout.grow_right()
),
Key(
[self.mod, "shift"], "l",
lazy.layout.shuffle_right()
),
Key(
[self.mod, "shift", "control"], "h",
lazy.layout.grow_left()
),
Key(
[self.mod, "shift"], "h",
lazy.layout.shuffle_left()
),
Key(
[self.mod], "s",
lazy.layout.toggle_split()
)
Key([self.mod, "shift"], "j", lazy.layout.shuffle_down()),
Key([self.mod, "shift"], "k", lazy.layout.shuffle_up()),
Key([self.mod, "shift"], "h", lazy.layout.shuffle_left()),
Key([self.mod, "shift"], "l", lazy.layout.shuffle_right()),
Key([self.mod, "shift", "control"], "j", lazy.layout.grow_down()),
Key([self.mod, "shift", "control"], "k", lazy.layout.grow_up()),
Key([self.mod, "shift", "control"], "h", lazy.layout.grow_left()),
Key([self.mod, "shift", "control"], "l", lazy.layout.grow_right()),
Key([self.mod], "s", lazy.layout.toggle_split()),
Key([self.mod], "n", lazy.layout.normalize()),
])
# Util functions
@ -505,20 +537,68 @@ class Kuro(BaseTheme):
elif laptop_screen is not None and len(screens) > 1:
utils.execute("arandr")
def reinitialize_visualizers(self, qtile=None):
if Config.get("show_audio_visualizer", False):
logger.warning("Reinitializing visualizers...")
for screen in self.qtile.screens:
for widget in screen.top.widgets:
if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget):
if widget.client is not None:
widget.client.kill()
widget.client = None
widget.screen = None
self.update_visualizers(qtile=qtile)
def update_visualizers(self, qtile=None):
if Config.get("show_audio_visualizer", False):
logger.warning("Updating visualizers..")
for screen in self.qtile.screens:
for widget in screen.top.widgets:
if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget):
if widget.client is None:
logger.warning("Spawning for screen {}".format(screen))
utils.execute(Config.get('visualizer_app', "glava"))
else:
widget.update_graph()
def show_window_info(self, qtile):
window = qtile.currentWindow if qtile else None
import pprint
if window:
info = window.cmd_inspect() or None
name = window.name
utils.notify(title="Window properties {}".format(name),
content="{}".format(pprint.pformat(vars(window))))
if info:
info = pprint.pformat(info)
utils.notify(title="Window info of {}".format(name),
content="{}".format(info))
# @staticmethod
def toggle_window_static(self, qtile):
window = qtile.currentWindow
if window in self.static_windows:
utils.notify("Unpinned {}".format(window.name), "{} has been unpinned".format(window.name))
self.static_windows.remove(window)
del window.is_static_window
else:
utils.notify("Pinned {}".format(window.name), "{} has been pinned".format(window.name))
self.static_windows.append(window)
window.is_static_window = True
window.floating = True
# QTile base callbacks
def callback_startup_once(self, *args, **kwargs):
pass
#Kuro.update_screens(self.qtile)
def callback_startup(self):
utils.execute("sleep 3")
# self.log_info("Restoring previous wallpaper...")
# utils.execute_once("nitrogen --restore")
self.update_wallpaper(self.qtile)
def callback_startup(self):
if self.current_wallpaper:
utils.execute_once(["wal", "-n", "-i", "{}".format(self.current_wallpaper)])
p = utils.execute_once(["wal", "-n", "-i", "{}".format(self.current_wallpaper)])
p.wait()
else:
wallpaper = None
@ -531,30 +611,29 @@ class Kuro(BaseTheme):
if wallpaper:
Kuro.set_wallpaper(self.qtile, wallpaper)
else:
utils.execute_once("nitrogen --restore")
p = utils.execute_once("nitrogen --restore")
p.wait()
self.log_info("Starting compositor...")
utils.execute_once("compton -b")
# self.log_info("Starting compositor...")
# utils.execute_once("compton -b")
# Update color scheme
self.initialize_colorscheme()
# Update color scheme
Kuro.update_colorscheme(self.qtile)
# display = os.environ['DISPLAY']
#
# if not display:
# display = ":0"
#
# # Start compton for each screen
# for x in range(self.num_screens):
# self.log_info("Launching compton for screen {}.{}".format(display, x))
# utils.execute_once("compton --config ~/.config/compton.conf -b -d {}.{}".format(display, x))
# def callback_screen_change(self, *args, **kwargs):
# self.num_screens = utils.get_screen_count()
# return True
# for window in self.static_windows:
# window.togroup()
def callback_setgroup(self, *args, **kwargs):
for window in self.static_windows:
# Only move if the window is not currently on any screen.
if window.group.screen is None:
try:
window.togroup()
except WindowError as e:
logger.warning("Could not move static window {}, removing from list: {}".format(window.name, e))
del window.is_static_window
self.static_windows.remove(window)
def callback_focus_change(self, *args, **kwargs):
if self.do_keyboard_updates:
@ -599,6 +678,56 @@ class Kuro(BaseTheme):
window.name))
self.log_info(str(self.qtile.dgroups.rules_map))
# Check if it is a visualizer
if Config.get("show_audio_visualizer", False):
client = args[0] if len(args) > 0 else None
if client is not None and client.window.get_name() == "GLava":
placed = False
for screen in self.qtile.screens:
for widget in screen.top.widgets:
if not placed and isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget):
if widget.client is None:
viz_info = widget.info()
pos_x = viz_info['offset'] + widget.margin_x - 1
pos_y = 0 + widget.margin_y - 1
width = viz_info['width'] - (2 * (widget.margin_x - 1))
height = viz_info['height'] - (2 * (widget.margin_y - 1))
screen_index = self.qtile.screens.index(screen)
logger.warning("Attaching {} {} to {} on screen {}".format(client, client.window.wid, type(widget).__name__, screen_index))
c = KuroStatic.create(client, screen, x=pos_x, y=pos_y, width=width, height=height)
c.setOpacity(Config.get("bar_opacity", 1.0))
widget.set_client(c, screen)
placed = True
if not placed:
if Config.get("kill_unnecessary_glava_processes", False):
logger.warning("Killing GLava {} because there is no widget where it can fit".format(client))
utils.notify("Glava", "Killing new GLava process because there is no screen without a visualizer")
client.kill()
else:
logger.warning("Not repositioning GLava {} because there is no widget where it can fit".format(client))
utils.notify("Glava", "Not repisitioning new GLava process because there is no screen without a visualizer")
def callback_client_killed(self, *args, **kwargs):
client = args[0]
logger.warning("Client {} Killed".format(client))
# Detach visualizer from widget if it was a visualizer window
if isinstance(client, KuroStatic):
for screen in self.qtile.screens:
for widget in screen.top.widgets:
if isinstance(widget, kuro.utils.widgets.AudioVisualizerWidget):
if widget.client == client:
screen_index = self.qtile.screens.index(screen)
logger.warning("Detaching {} {} from widget {} on screen {}".format(client, client.window.wid, type(widget).__name__, screen_index))
widget.client = None
widget.screen = None
# If this window was static, remove it from the static window list
if hasattr(client, "is_static_window") and client.is_static_window:
logger.warning("Removing static window {}".format(client.name))
del client.is_static_window
self.static_windows.remove(client)
@staticmethod
def update_wallpaper(qtile):
wallpapers = []
@ -617,7 +746,8 @@ class Kuro(BaseTheme):
@staticmethod
def set_wallpaper(qtile, filename):
utils.execute_once("wal-nitrogen {}".format(filename))
p = utils.execute_once("wal-nitrogen-noupdate {}".format(filename))
p.wait()
qtile.theme_instance.current_wallpaper = filename
Kuro.update_colorscheme(qtile)
@ -644,6 +774,7 @@ class Kuro(BaseTheme):
Config.highlight = colors['color3']
Config.inactive_light = colors['color4']
Config.inactive_dark = colors['color5']
Config.bar_background = colors['color1']
@staticmethod
def update_colorscheme(qtile):
@ -651,7 +782,8 @@ class Kuro(BaseTheme):
:type qtile: libqtile.manager.Qtile
"""
if qtile.theme_instance.current_wallpaper:
utils.execute(["wal", "-n", "-i", "{}".format(qtile.theme_instance.current_wallpaper)])
p = utils.execute(["wal", "-n", "-i", "{}".format(qtile.theme_instance.current_wallpaper)])
p.wait()
colors = None
if os.path.isfile("/home/kevin/.cache/wal/colors.json"):
@ -668,6 +800,7 @@ class Kuro(BaseTheme):
Config.highlight = colors['color3']
Config.inactive_light = colors['color4']
Config.inactive_dark = colors['color5']
Config.bar_background = colors['color1']
# Update border colors in layouts
for group in qtile.groups:
@ -687,7 +820,7 @@ class Kuro(BaseTheme):
try:
w._update_drawer()
except Exception as e:
logger.error("Error while updating drawer: {}".format(e))
logger.error("Error while updating drawer for widget {}: {}".format(w, e))
if hasattr(w, 'foreground'):
w.foreground = colors['color15']
@ -722,12 +855,17 @@ class Kuro(BaseTheme):
if hasattr(w, 'other_screen_border'):
w.other_screen_border = colors['color8']
if isinstance(w, utils.AudioVisualizerWidget):
if isinstance(w, kuro.utils.widgets.AudioVisualizerWidget):
w.graph_color = colors['color15']
w.fill_color = colors['color8']
bar.draw()
# Update colors in visualizers and restart visualizers
with open(Config.get("glava_color_file_path", "~/.config/glava/kurobars_color.glsl"), 'w') as f:
f.write("#define COLOR {}\n#request setbg {}".format(colors['color15'], colors['color1'][1:]))
qtile.theme_instance.reinitialize_visualizers()
utils.notify(
"Updated colorscheme!",
"active: {}, inactive: {}".format(colors['color15'], colors['color1'])

View file

@ -1,30 +1,12 @@
import os
import re
import subprocess
from asyncio import Queue
from threading import Thread
from time import sleep
import cairocffi
import notify2
import numpy
import pyaudio
import six
from libqtile import widget, bar
from libqtile.widget.currentlayout import CurrentLayoutIcon
from libqtile.widget.graph import _Graph
from libqtile import widget
from libqtile.window import Internal
from libqtile.bar import Bar
from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound
from libqtile.widget import base
from libqtile.widget.battery import default_icon_path
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.image import Image
from libqtile.widget.sensors import ThermalSensor
from libqtile.widget.volume import Volume
from libqtile.widget.battery import BatteryIcon
from libqtile.widget.wlan import get_status
from libqtile.log_utils import logger
from notify2 import Notification, URGENCY_NORMAL
notify2.init("QTileWM")
@ -38,6 +20,8 @@ BUTTON_SCROLL_DOWN = 5
def is_running(process):
s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
if isinstance(process, list):
process = "".join(process)
for x in s.stdout:
if re.search(process, x.decode('utf-8')):
return True
@ -200,6 +184,26 @@ class KuroTopBar(Bar):
self.window.handle_EnterNotify = self.handle_enter_notify
self.window.handle_LeaveNotify = self.handle_leave_notify
def draw(self):
if self.queued_draws == 0:
self.qtile.call_soon(self._actual_draw)
self.queued_draws += 1
def _actual_draw(self):
self.queued_draws = 0
self._resize(self.length, self.widgets)
for i in self.widgets:
i.draw()
if self.widgets:
end = i.offset + i.length
if end < self.length:
if self.horizontal:
self.drawer.draw(offsetx=end, width=self.length - end)
else:
self.drawer.draw(offsety=end, height=self.length - end)
self.theme.update_visualizers()
def handle_enter_notify(self, e):
# self.theme.log_debug("Bar HandleEnterNotify")
#
@ -237,584 +241,3 @@ class KuroTopBar(Bar):
self.draw()
class AppLauncherIcon(Image):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'")
def handle_hover(self, event):
spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!")
class CheckUpdatesYay(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYay, self).__init__(**config)
# Override command and output with yay command
self.cmd = "yay -Qua".split()
self.status_cmd = "yay -Qua --color never".split()
self.update_cmd = "sudo yay".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYay, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class KuroBatteryIcon(BatteryIcon):
status_cmd = "acpi"
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify(
"Battery Status",
output
)
class PulseVolumeWidget(Volume):
defaults = [
("cardid", None, "Card Id"),
("device", "default", "Device Name"),
("channel", "Master", "Channel"),
("padding", 3, "Padding left and right. Calculated if None."),
("theme_path", None, "Path of the icons"),
("update_interval", 0.2, "Update time in seconds."),
("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set."
"The specified font needs to contain the correct unicode characters."),
("mute_command", None, "Mute command"),
("volume_up_command", None, "Volume up command"),
("volume_down_command", None, "Volume down command"),
("get_volume_command", None, "Command to get the current volume"),
("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"),
]
_old_length = 0
def __init__(self, **config):
super(PulseVolumeWidget, self).__init__(**config)
self._old_length = self._length
# Augment commands with bluetooth sink ID if this is a bluetooth icon
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
elif self.is_bluetooth_icon:
self.commands_need_reset = True
else:
self.commands_need_reset = False
self._old_length = self._length
def reset_bluetooth_commands(self):
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
def get_volume(self):
try:
get_volume_cmd = "echo 0".split()
if self.get_volume_command:
if self.is_bluetooth_icon and bluetooth_audio_sink() == -1:
pass
else:
get_volume_cmd = self.get_volume_command
mixer_out = self.call_process(get_volume_cmd)
except subprocess.CalledProcessError:
return -1
try:
return int(mixer_out.strip())
except ValueError:
return -1
def _update_drawer(self):
super(PulseVolumeWidget, self)._update_drawer()
self.text = ""
if self.is_bluetooth_icon and not bluetooth_audio_connected():
self._length = 0
def draw(self):
if self.is_bluetooth_icon and not bluetooth_audio_connected():
if not self.commands_need_reset:
logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer")
self.commands_need_reset = True
base._TextBox.draw(self)
else:
if self.commands_need_reset:
self.reset_bluetooth_commands()
if self.theme_path:
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
base._TextBox.draw(self)
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
volume = self.get_volume()
width = 15
if volume >= 0:
volume_amount = round((volume/100)*width)
else:
volume_amount = 0
msg = "[{}{}]".format(
"".join(["#" for x in range(volume_amount)]),
"".join(["-" for x in range(width-volume_amount)])
)
notify(
"{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume),
msg
)
else:
super(PulseVolumeWidget, self).button_press(x, y, button)
class WifiIconWidget(base._TextBox):
"""WiFi connection strength indicator widget."""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('interface', 'wlan0', 'The interface to monitor'),
('update_interval', 1, 'The update interval.'),
('theme_path', default_icon_path(), 'Path of the icons'),
('custom_icons', {}, 'dict containing key->filename icon map'),
]
def __init__(self, **config):
super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config)
self.add_defaults(WifiIconWidget.defaults)
if self.theme_path:
self.length_type = bar.STATIC
self.length = 0
self.surfaces = {}
self.current_icon = 'wireless-disconnected'
self.icons = dict([(x, '{0}.png'.format(x)) for x in (
'wireless-disconnected',
'wireless-none',
'wireless-low',
'wireless-medium',
'wireless-high',
'wireless-full',
)])
self.icons.update(self.custom_icons)
def _get_info(self):
try:
essid, quality = get_status(self.interface)
disconnected = essid is None
if disconnected:
return self.disconnected_message
return {
'error': False,
'essid': essid,
'quality': quality,
'percent': (quality / 70)
}
except EnvironmentError:
logger.error(
'%s: Probably your wlan device is switched off or '
' otherwise not present in your system.',
self.__class__.__name__)
return {'error': True}
def timer_setup(self):
self.update()
self.timeout_add(self.update_interval, self.timer_setup)
def _configure(self, qtile, bar):
super(WifiIconWidget, self)._configure(qtile, bar)
self.setup_images()
def _get_icon_key(self):
key = 'wireless'
info = self._get_info()
if info is False or info.get('error'):
key += '-none'
elif info.get('essid') is None:
key += '-disconnected'
else:
percent = info['percent']
if percent < 0.2:
key += '-low'
elif percent < 0.4:
key += '-medium'
elif percent < 0.8:
key += '-high'
else:
key += '-full'
return key
def update(self):
icon = self._get_icon_key()
if icon != self.current_icon:
self.current_icon = icon
self.draw()
def draw(self):
if self.theme_path:
self.drawer.clear(self.background or self.bar.background)
self.drawer.ctx.set_source(self.surfaces[self.current_icon])
self.drawer.ctx.paint()
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
self.text = self.current_icon[8:]
base._TextBox.draw(self)
def setup_images(self):
for key, name in self.icons.items():
try:
path = os.path.join(self.theme_path, name)
img = cairocffi.ImageSurface.create_from_png(path)
except cairocffi.Error:
self.theme_path = None
logger.warning('Wireless Icon switching to text mode')
return
input_width = img.get_width()
input_height = img.get_height()
sp = input_height / (self.bar.height - 1)
width = input_width / sp
if width > self.length:
# cast to `int` only after handling all potentially-float values
self.length = int(width + self.actual_padding * 2)
imgpat = cairocffi.SurfacePattern(img)
scaler = cairocffi.Matrix()
scaler.scale(sp, sp)
scaler.translate(self.actual_padding * -1, 0)
imgpat.set_matrix(scaler)
imgpat.set_filter(cairocffi.FILTER_BEST)
self.surfaces[key] = imgpat
class ThermalSensorWidget(ThermalSensor):
defaults = [
('metric', True, 'True to use metric/C, False to use imperial/F'),
('show_tag', False, 'Show tag sensor'),
('update_interval', 2, 'Update interval in seconds'),
('tag_sensor', None,
'Tag of the temperature sensor. For example: "temp1" or "Core 0"'),
('chip', None, 'Chip argument for sensors command'),
(
'threshold',
70,
'If the current temperature value is above, '
'then change to foreground_alert colour'
),
('foreground_alert', 'ff0000', 'Foreground colour alert'),
]
@catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError)
def get_temp_sensors(self):
"""calls the unix `sensors` command with `-f` flag if user has specified that
the output should be read in Fahrenheit.
"""
command = ["sensors", ]
if self.chip:
command.append(self.chip)
if not self.metric:
command.append("-f")
sensors_out = self.call_process(command)
return self._format_sensors_output(sensors_out)
class SeparatorWidget(base._TextBox):
def __init__(self):
super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14)
class MediaWidget(base.InLoopPollText):
"""Media Status Widget"""
class Status:
OFFLINE = 0
PLAYING = 1
PAUSED = 2
STOPPED = 3
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('off_text', '', 'The pattern for the text if no players are found.'),
('on_text_play', '{}', 'The pattern for the text if music is playing.'),
('on_text_pause', '{}', 'The pattern for the text if music is paused.'),
('on_text_stop', '{}', 'The pattern for the text if music is stopped.'),
('update_interval', 1, 'The update interval.'),
]
player_icons = {
'spotify': '',
'vlc': '',
'firefox': '',
}
custom_player_data = {
'firefox': {
'showing': False,
'title': '',
'state': Status.STOPPED,
}
}
def __init__(self, **config):
super(MediaWidget, self).__init__(**config)
self.add_defaults(MediaWidget.defaults)
self.surfaces = {}
def cmd_update_custom_player(self, player_name, data):
# Update firefox player
if player_name == "firefox":
if data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED
self.custom_player_data['firefox']['title'] = data['title']
elif data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = False
self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE
self.custom_player_data['firefox']['title'] = data['title']
def _get_players(self):
players = []
# Playerctl players
command = ["playerctl", "-l"]
result = self.call_process(command)
if result:
players.extend([x for x in result.split("\n") if x])
# Custom players - Firefox
if self.custom_player_data['firefox']['showing']:
players.append('firefox')
if players:
return players
else:
return None
def _get_info(self):
players = self._get_players()
if not players:
return {}
else:
result = {}
for player in players:
if player in self.custom_player_data.keys():
# Custom player -- Firefox
if player == "firefox":
result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']]
# Other custom players -- generic attempt with error catching
else:
try:
result[player] = [self.custom_player_data[player]['state'],
self.custom_player_data[player]['title']]
except KeyError:
pass
else:
# PlayerCtl player
command = ["playerctl", "-p", player, "status"]
cmd_result = self.call_process(command).strip()
text = "Unknown"
if cmd_result in ["Playing", "Paused"]:
artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip()
title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip()
if artist and title:
text = "{} - {}".format(artist, title)
elif artist:
text = artist
elif title:
text = title
if cmd_result == "Playing":
result[player] = [MediaWidget.Status.PLAYING, text]
elif cmd_result == "Paused":
result[player] = [MediaWidget.Status.PAUSED, text]
elif cmd_result == "Stopped":
result[player] = [MediaWidget.Status.STOPPED, ""]
return result
def _get_formatted_text(self, status):
if status[0] == MediaWidget.Status.PLAYING:
return self.on_text_play.format(status[1])
elif status[0] == MediaWidget.Status.PAUSED:
return self.on_text_pause.format(status[1])
elif status[0] == MediaWidget.Status.STOPPED:
return self.on_text_stop.format(status[1])
else:
return "Unknown"
def poll(self):
text = []
status = self._get_info()
if not status:
return self.off_text
else:
for player in status.keys():
icon = self.player_icons.get(player, player)
logger.warning([player, status[player]])
text.append("{} {}".format(icon, self._get_formatted_text(status[player])))
return " | ".join(text) if text else self.off_text
class AudioVisualizerWidget(_Graph):
"""Display Audio Visualization graph"""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
("audio_channel", "default", "Which audio channel to show"),
]
stream = None
fixed_upper_bound = True
def __init__(self, **config):
_Graph.__init__(self, **config)
self.add_defaults(AudioVisualizerWidget.defaults)
self.maxvalue = 100
self.samples = 1024
self.max_observed = 1
# initialize communication queue
self.q = Queue()
self.t = None
self.stream = None
self.tries = 0
def initialize_stream(self):
# initialize portaudio
p = pyaudio.PyAudio()
try:
self.stream = p.open(format=pyaudio.paInt16, channels=1, rate=44100, input=True, frames_per_buffer=self.samples)
# initialize thread
self.t = Thread(target=self.process, args=[self, self.q])
self.t.start()
except OSError as e:
logger.warning("Could not open audio stream: ".format(e))
self.tries += 1
@staticmethod
def process(widget: 'AudioVisualizerWidget', queue: Queue):
item = queue.get()
if widget.max_observed > 100:
widget.max_observed -= 100
# Discard all available frames
avail = widget.stream.get_read_available()
while avail > 1000:
_ = widget.stream.read(avail)
logger.debug("Discarded {} frames".format(avail))
avail = widget.stream.get_read_available()
if avail > 100:
data = widget.stream.read(widget.samples)
numpydata = numpy.abs(numpy.fromstring(data, dtype=numpy.int16))
if numpy.max(numpydata) > widget.max_observed:
widget.max_observed = numpy.max(numpydata)
numpydata = numpydata * (100 / widget.max_observed)
numpydata = AudioVisualizerWidget.window_rms(numpydata, 25)
widget.values = list(numpydata)
print(widget.values)
else:
widget.values = [0]*1024
@staticmethod
def window_rms(a, window_size):
a2 = numpy.power(a, 2)
window = numpy.ones(window_size) / float(window_size)
return numpy.sqrt(numpy.convolve(a2, window, 'valid'))
def update_graph(self):
if not self.stream and self.tries < 10:
self.initialize_stream()
else:
if self.q.empty():
self.q.put(True)
self.draw()
class KuroCurrentLayoutIcon(CurrentLayoutIcon):
def _get_layout_names(self):
names = super(KuroCurrentLayoutIcon, self)._get_layout_names()
from kuro.utils import layouts as kuro_layouts
from libqtile.layout.base import Layout
klayouts = [
layout_class_name.lower()
for layout_class, layout_class_name
in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts))
if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout)
]
names.extend(klayouts)
return list(set(names))

View file

@ -1,51 +1,82 @@
from libqtile.layout.wmii import Wmii
from libqtile.layout import Floating
from libqtile.layout.columns import Columns
class KuroWmii(Wmii):
def cmd_previous(self):
super(KuroWmii, self).cmd_previous()
class KuroWmii(Columns):
pass
def cmd_next(self):
super(KuroWmii, self).cmd_next()
def add(self, client):
"""
Add a new client window to the layout and focus it. It will be added to either the current column if there
are less rows in the current column than columns on the screen, or to a new row to the right of the current
column if there are less columns than rows in the current column.
:param client: The client window to add.
"""
self.clients.append(client)
c = self.current_column()
if c is None:
if len(self.columns) == 0:
self.columns = [{'active': 0, 'width': 100, 'mode': 'split', 'rows': []}]
c = self.columns[0]
c['rows'].append(client)
class KuroFloating(Floating):
defaults = [
("border_static", "#dddddd", "Border colour for static windows."),
]
def __init__(self, *args, **kwargs):
super(KuroFloating, self).__init__(*args, **kwargs)
self.add_defaults(KuroFloating.defaults)
def configure(self, client, screen):
# 'sun-awt-X11-XWindowPeer' is a dropdown used in Java application,
# don't reposition it anywhere, let Java app to control it
cls = client.window.get_wm_class() or ''
is_java_dropdown = 'sun-awt-X11-XWindowPeer' in cls
if is_java_dropdown:
return
if hasattr(client, "is_static_window") and client.is_static_window:
bc = client.group.qtile.colorPixel(self.border_static)
elif client.has_focus:
bc = client.group.qtile.colorPixel(self.border_focus)
else:
num_cols = len(self.columns)
num_rows_curr_col = len(c['rows'])
if num_rows_curr_col < num_cols:
c['rows'].append(client)
else:
self.add_column_to_right(c, client)
self.focus(client)
def add_column_to_right(self, column, win):
"""
Adds a new column to the right of the given column with the given window in it
:param column: The column that's going to be to the left of the new column
:param win: The window to add to the new column
"""
newwidth = int(100 / (len(self.columns) + 1))
# we are only called if there already is a column, simplifies things
for c in self.columns:
c['width'] = newwidth
c = {'width': newwidth, 'mode': 'split', 'rows': [win]}
bc = client.group.qtile.colorPixel(self.border_normal)
if client.maximized:
bw = self.max_border_width
elif client.fullscreen:
bw = self.fullscreen_border_width
else:
bw = self.border_width
above = False
# We definitely have a screen here, so let's be sure we'll float on screen
try:
index = self.columns.index(column) + 1
except ValueError:
index = 0
client.float_x
client.float_y
except AttributeError:
# this window hasn't been placed before, let's put it in a sensible spot
transient_for = client.window.get_wm_transient_for()
win = client.group.qtile.windowMap.get(transient_for)
if win is not None:
# if transient for a window, place in the center of the window
center_x = win.x + win.width / 2
center_y = win.y + win.height / 2
else:
center_x = screen.x + screen.width / 2
center_y = screen.y + screen.height / 2
above = True
self.columns.insert(index, c)
x = center_x - client.width / 2
y = center_y - client.height / 2
# don't go off the right...
x = min(x, screen.x + screen.width)
# or left...
x = max(x, screen.x)
# or bottom...
y = min(y, screen.y + screen.height)
# or top
y = max(y, screen.y)
if not (self.no_reposition_match and self.no_reposition_match.compare(client)):
client.x = int(round(x))
client.y = int(round(y))
client.place(
client.x,
client.y,
client.width,
client.height,
bw,
bc,
above,
)
client.unhide()

679
kuro/utils/widgets.py Normal file
View file

@ -0,0 +1,679 @@
import os
import subprocess
import cairocffi
import six
from libqtile import bar, pangocffi
from libqtile.log_utils import logger
from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound
from libqtile.widget import base
from libqtile.widget.battery import BatteryIcon, default_icon_path
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.currentlayout import CurrentLayoutIcon
from libqtile.widget.graph import _Graph
from libqtile.widget.image import Image
from libqtile.widget.sensors import ThermalSensor
from libqtile.widget.tasklist import TaskList
from libqtile.widget.volume import Volume
from libqtile.widget.wlan import get_status
from libqtile.window import Window
from kuro.utils.general import BUTTON_LEFT, execute, spawn_popup, notify, BUTTON_MIDDLE, bluetooth_audio_connected, \
bluetooth_audio_sink, BUTTON_RIGHT
class AppLauncherIcon(Image):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'")
def handle_hover(self, event):
spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!")
class CheckUpdatesYay(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYay, self).__init__(**config)
# Override command and output with yay command
self.cmd = "yay -Qua".split()
self.status_cmd = "yay -Qua --color never".split()
self.update_cmd = "sudo yay".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYay, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class KuroBatteryIcon(BatteryIcon):
status_cmd = "acpi"
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify(
"Battery Status",
output
)
class PulseVolumeWidget(Volume):
defaults = [
("cardid", None, "Card Id"),
("device", "default", "Device Name"),
("channel", "Master", "Channel"),
("padding", 3, "Padding left and right. Calculated if None."),
("theme_path", None, "Path of the icons"),
("update_interval", 0.2, "Update time in seconds."),
("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set."
"The specified font needs to contain the correct unicode characters."),
("mute_command", None, "Mute command"),
("volume_up_command", None, "Volume up command"),
("volume_down_command", None, "Volume down command"),
("get_volume_command", None, "Command to get the current volume"),
("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"),
]
_old_length = 0
def __init__(self, **config):
super(PulseVolumeWidget, self).__init__(**config)
self._old_length = self._length
# Augment commands with bluetooth sink ID if this is a bluetooth icon
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
elif self.is_bluetooth_icon:
self.commands_need_reset = True
else:
self.commands_need_reset = False
self._old_length = self._length
def reset_bluetooth_commands(self):
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
def get_volume(self):
try:
get_volume_cmd = "echo 0".split()
if self.get_volume_command:
if self.is_bluetooth_icon and bluetooth_audio_sink() == -1:
pass
else:
get_volume_cmd = self.get_volume_command
mixer_out = self.call_process(get_volume_cmd)
except subprocess.CalledProcessError:
return -1
try:
return int(mixer_out.strip())
except ValueError:
return -1
def _update_drawer(self):
if self.volume is not None:
super(PulseVolumeWidget, self)._update_drawer()
self.text = ""
if self.is_bluetooth_icon and not bluetooth_audio_connected():
self._length = 0
def draw(self):
if self.is_bluetooth_icon and not bluetooth_audio_connected():
if not self.commands_need_reset:
logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer")
self.commands_need_reset = True
base._TextBox.draw(self)
else:
if self.commands_need_reset:
self.reset_bluetooth_commands()
if self.theme_path:
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
base._TextBox.draw(self)
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
volume = self.get_volume()
width = 15
if volume >= 0:
volume_amount = round((volume/100)*width)
else:
volume_amount = 0
msg = "[{}{}]".format(
"".join(["#" for x in range(volume_amount)]),
"".join(["-" for x in range(width-volume_amount)])
)
notify(
"{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume),
msg
)
else:
super(PulseVolumeWidget, self).button_press(x, y, button)
class WifiIconWidget(base._TextBox):
"""WiFi connection strength indicator widget."""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('interface', 'wlan0', 'The interface to monitor'),
('update_interval', 1, 'The update interval.'),
('theme_path', default_icon_path(), 'Path of the icons'),
('custom_icons', {}, 'dict containing key->filename icon map'),
]
def __init__(self, **config):
super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config)
self.add_defaults(WifiIconWidget.defaults)
if self.theme_path:
self.length_type = bar.STATIC
self.length = 0
self.surfaces = {}
self.current_icon = 'wireless-disconnected'
self.icons = dict([(x, '{0}.png'.format(x)) for x in (
'wireless-disconnected',
'wireless-none',
'wireless-low',
'wireless-medium',
'wireless-high',
'wireless-full',
)])
self.icons.update(self.custom_icons)
def _get_info(self):
try:
essid, quality = get_status(self.interface)
disconnected = essid is None
if disconnected:
return self.disconnected_message
return {
'error': False,
'essid': essid,
'quality': quality,
'percent': (quality / 70)
}
except EnvironmentError:
logger.error(
'%s: Probably your wlan device is switched off or '
' otherwise not present in your system.',
self.__class__.__name__)
return {'error': True}
def timer_setup(self):
self.update()
self.timeout_add(self.update_interval, self.timer_setup)
def _configure(self, qtile, bar):
super(WifiIconWidget, self)._configure(qtile, bar)
self.setup_images()
def _get_icon_key(self):
key = 'wireless'
info = self._get_info()
if info is False or info.get('error'):
key += '-none'
elif info.get('essid') is None:
key += '-disconnected'
else:
percent = info['percent']
if percent < 0.2:
key += '-low'
elif percent < 0.4:
key += '-medium'
elif percent < 0.8:
key += '-high'
else:
key += '-full'
return key
def update(self):
icon = self._get_icon_key()
if icon != self.current_icon:
self.current_icon = icon
self.draw()
def draw(self):
if self.theme_path:
self.drawer.clear(self.background or self.bar.background)
self.drawer.ctx.set_source(self.surfaces[self.current_icon])
self.drawer.ctx.paint()
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
self.text = self.current_icon[8:]
base._TextBox.draw(self)
def setup_images(self):
for key, name in self.icons.items():
try:
path = os.path.join(self.theme_path, name)
img = cairocffi.ImageSurface.create_from_png(path)
except cairocffi.Error:
self.theme_path = None
logger.warning('Wireless Icon switching to text mode')
return
input_width = img.get_width()
input_height = img.get_height()
sp = input_height / (self.bar.height - 1)
width = input_width / sp
if width > self.length:
# cast to `int` only after handling all potentially-float values
self.length = int(width + self.actual_padding * 2)
imgpat = cairocffi.SurfacePattern(img)
scaler = cairocffi.Matrix()
scaler.scale(sp, sp)
scaler.translate(self.actual_padding * -1, 0)
imgpat.set_matrix(scaler)
imgpat.set_filter(cairocffi.FILTER_BEST)
self.surfaces[key] = imgpat
class ThermalSensorWidget(ThermalSensor):
defaults = [
('metric', True, 'True to use metric/C, False to use imperial/F'),
('show_tag', False, 'Show tag sensor'),
('update_interval', 2, 'Update interval in seconds'),
('tag_sensor', None,
'Tag of the temperature sensor. For example: "temp1" or "Core 0"'),
('chip', None, 'Chip argument for sensors command'),
(
'threshold',
70,
'If the current temperature value is above, '
'then change to foreground_alert colour'
),
('foreground_alert', 'ff0000', 'Foreground colour alert'),
]
@catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError)
def get_temp_sensors(self):
"""calls the unix `sensors` command with `-f` flag if user has specified that
the output should be read in Fahrenheit.
"""
command = ["sensors", ]
if self.chip:
command.append(self.chip)
if not self.metric:
command.append("-f")
sensors_out = self.call_process(command)
return self._format_sensors_output(sensors_out)
class SeparatorWidget(base._TextBox):
def __init__(self):
super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14)
class MediaWidget(base.InLoopPollText):
"""Media Status Widget"""
class Status:
OFFLINE = 0
PLAYING = 1
PAUSED = 2
STOPPED = 3
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('off_text', '', 'The pattern for the text if no players are found.'),
('on_text_play', '{}', 'The pattern for the text if music is playing.'),
('on_text_pause', '{}', 'The pattern for the text if music is paused.'),
('on_text_stop', '{}', 'The pattern for the text if music is stopped.'),
('update_interval', 1, 'The update interval.'),
]
player_icons = {
'spotify': '',
'vlc': '',
'firefox': '',
}
custom_player_data = {
'firefox': {
'showing': False,
'title': '',
'state': Status.STOPPED,
}
}
image_urls = {}
current_image_url = None
player_to_control = None
def __init__(self, **config):
super(MediaWidget, self).__init__(**config)
self.add_defaults(MediaWidget.defaults)
self.surfaces = {}
self.player_to_control = None
def _player_to_control(self):
info = self._get_info()
players = {}
for player in info.keys():
if player not in self.custom_player_data.keys():
if info[player][0] in [MediaWidget.Status.PLAYING, MediaWidget.Status.PAUSED]:
players[player] = info[player]
if self.player_to_control is not None and self.player_to_control not in players.keys():
self.player_to_control = None
if self.player_to_control is not None:
players = {self.player_to_control: players[self.player_to_control]}
if len(players.keys()) == 1:
player = list(players.keys())[0]
self.player_to_control = player
return player
elif len(players) == 0:
notify("MediaWidget", "Nothing to control!")
else:
notify("MediaWidget", "Multiple players to control, I don't know what you want to do!")
return None
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-p", player, "play-pause"]
_ = self.call_process(command)
notify("MediaWidget", "Toggled {}".format(player))
if button == BUTTON_RIGHT:
player = self._player_to_control()
if player is not None:
command = ["playerctl", "-p", player, "next"]
_ = self.call_process(command)
def cmd_update_custom_player(self, player_name, data):
# Update firefox player
if player_name == "firefox":
if data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED
self.custom_player_data['firefox']['title'] = data['title']
elif data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = False
self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE
self.custom_player_data['firefox']['title'] = data['title']
def _get_players(self):
players = []
# Playerctl players
command = ["playerctl", "-l"]
result = self.call_process(command)
if result:
players.extend([x for x in result.split("\n") if x])
# Custom players - Firefox
if self.custom_player_data['firefox']['showing']:
players.append('firefox')
if players:
return players
else:
return None
def _get_info(self):
players = self._get_players()
if not players:
return {}
else:
result = {}
for player in players:
if player in self.custom_player_data.keys():
# Custom player -- Firefox
if player == "firefox":
result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']]
# Other custom players -- generic attempt with error catching
else:
try:
result[player] = [self.custom_player_data[player]['state'],
self.custom_player_data[player]['title']]
except KeyError:
pass
else:
# PlayerCtl player
command = ["playerctl", "-p", player, "status"]
cmd_result = self.call_process(command).strip()
text = "Unknown"
if cmd_result in ["Playing", "Paused"]:
artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip()
title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip()
if artist and title:
text = "{} - {}".format(artist, title)
elif artist:
text = artist
elif title:
text = title
if cmd_result == "Playing":
result[player] = [MediaWidget.Status.PLAYING, text]
elif cmd_result == "Paused":
result[player] = [MediaWidget.Status.PAUSED, text]
elif cmd_result == "Stopped":
result[player] = [MediaWidget.Status.STOPPED, ""]
return result
def _get_formatted_text(self, status):
if status[0] == MediaWidget.Status.PLAYING:
return self.on_text_play.format(status[1])
elif status[0] == MediaWidget.Status.PAUSED:
return self.on_text_pause.format(status[1])
elif status[0] == MediaWidget.Status.STOPPED:
return self.on_text_stop.format(status[1])
else:
return "Unknown"
def draw(self):
super(MediaWidget, self).draw()
def poll(self):
text = []
status = self._get_info()
if not status:
return self.off_text
else:
for player in status.keys():
icon = self.player_icons.get(player, player)
text.append("{} {}".format(icon, self._get_formatted_text(status[player])))
return " | ".join(text) if text else self.off_text
class AudioVisualizerWidget(_Graph):
"""Display Audio Visualization graph"""
orientations = base.ORIENTATION_HORIZONTAL
fixed_upper_bound = True
def __init__(self, **config):
_Graph.__init__(self, **config)
self.add_defaults(AudioVisualizerWidget.defaults)
self.client = None
self.screen = None
self.old_position = None
def set_client(self, c, s):
self.client = c
self.screen = s
def update_graph(self):
if self.client is not None:
viz_info = self.info()
pos_x = viz_info['offset'] + self.margin_x - 1 + self.screen.x
pos_y = 0 + self.margin_y - 1 + self.screen.y
if self.old_position != (pos_x, pos_y):
self.old_position = (pos_x, pos_y)
# Check if a window on this screen is full-screen
fullscreen = False
for window in self.screen.group.windows:
if isinstance(window, Window):
if window.fullscreen:
fullscreen = True
break
logger.warning("Repositioning {} {} to {}x{}".format(self.client, self.client.window.wid, pos_x, pos_y))
self.client.reposition(pos_x, pos_y, above=not fullscreen)
self.draw()
class KuroCurrentLayoutIcon(CurrentLayoutIcon):
def _get_layout_names(self):
names = super(KuroCurrentLayoutIcon, self)._get_layout_names()
from kuro.utils import layouts as kuro_layouts
from libqtile.layout.base import Layout
klayouts = [
layout_class_name.lower()
for layout_class, layout_class_name
in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts))
if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout)
]
names.extend(klayouts)
return list(set(names))
class KuroTaskList(TaskList):
defaults = [
(
'txt_pinned',
'P ',
'Text representation of the pinned window state. '
'e.g., "P " or "\U0001F5D7 "'
),
(
'markup_pinned',
None,
'Text markup of the pinned window state. Supports pangomarkup with markup=True.'
'e.g., "{}" or "<span underline="low">{}</span>"'
),
]
def __init__(self, *args, **kwargs):
super(KuroTaskList, self).__init__(*args, **kwargs)
self.add_defaults(KuroTaskList.defaults)
def get_taskname(self, window):
"""
Get display name for given window.
Depending on its state minimized, maximized and floating
appropriate characters are prepended.
"""
state = ''
markup_str = self.markup_normal
# Enforce markup and new string format behaviour when
# at least one markup_* option is used.
# Mixing non markup and markup may cause problems.
if self.markup_minimized or self.markup_maximized\
or self.markup_floating or self.markup_focused or self.markup_pinned:
enforce_markup = True
else:
enforce_markup = False
if window is None:
pass
elif hasattr(window, "is_static_window") and window.is_static_window:
state = self.txt_pinned
markup_str = self.markup_pinned
elif window.minimized:
state = self.txt_minimized
markup_str = self.markup_minimized
elif window.maximized:
state = self.txt_maximized
markup_str = self.markup_maximized
elif window.floating:
state = self.txt_floating
markup_str = self.markup_floating
elif window is window.group.currentWindow:
markup_str = self.markup_focused
window_name = window.name if window and window.name else "?"
# Emulate default widget behavior if markup_str is None
if enforce_markup and markup_str is None:
markup_str = "%s{}" % (state)
if markup_str is not None:
self.markup = True
window_name = pangocffi.markup_escape_text(window_name)
return markup_str.format(window_name)
return "%s%s" % (state, window_name)

69
kuro/utils/windows.py Normal file
View file

@ -0,0 +1,69 @@
from cairocffi.test_xcb import xcffib
from libqtile import hook
from libqtile.window import Window, Static
class KuroStatic(Static):
@staticmethod
def create(window: Window, screen, x=None, y=None, width=None, height=None):
"""Makes this window a static window, attached to a Screen
If any of the arguments are left unspecified, the values given by the
window itself are used instead. So, for a window that's aware of its
appropriate size and location (like dzen), you don't have to specify
anything.
"""
window.defunct = True
if isinstance(screen, int):
screen = window.qtile.screens[screen]
if window.group:
window.group.remove(window)
s = KuroStatic(window.window, window.qtile, screen, x, y, width, height)
window.qtile.windowMap[window.window.wid] = s
hook.fire("client_managed", s)
return s
def __init__(self, win, qtile, screen, x=None, y=None, width=None, height=None):
Static.__init__(self, win, qtile, screen, x=x, y=y, width=width, height=height)
self.above = True
self.placed_x = x
self.placed_y = y
if None not in (x, y, width, height):
self.place(x, y, width, height, 0, 0, above=self.above)
def set_above(self, above: bool):
self.above = above
self.reposition(self.placed_x, self.placed_y)
def reposition(self, x, y, above=None):
self.placed_x = x
self.placed_y = y
if above is not None:
self.above = above
self.place(x, y, self.width, self.height, 0, 0, above=self.above)
def handle_ConfigureRequest(self, e):
cw = xcffib.xproto.ConfigWindow
if self.conf_x is None and e.value_mask & cw.X:
self.x = e.x
if self.conf_y is None and e.value_mask & cw.Y:
self.y = e.y
if self.conf_width is None and e.value_mask & cw.Width:
self.width = e.width
if self.conf_height is None and e.value_mask & cw.Height:
self.height = e.height
self.place(
self.screen.x + self.x,
self.screen.y + self.y,
self.width,
self.height,
self.borderwidth,
self.bordercolor,
above=self.above
)
return False
def __repr__(self):
return "KuroStatic(%r)" % self.name