kuro-qtile-theme/kuro/utils/general.py

538 lines
17 KiB
Python

import os
import re
import subprocess
from time import sleep
import cairocffi
import notify2
from libqtile import widget, bar
from libqtile.window import Internal
from libqtile.bar import Bar
from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound
from libqtile.widget import base
from libqtile.widget.battery import default_icon_path
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.image import Image
from libqtile.widget.sensors import ThermalSensor
from libqtile.widget.volume import Volume
from libqtile.widget.battery import BatteryIcon
from libqtile.widget.wlan import get_status
from libqtile.log_utils import logger
from notify2 import Notification, URGENCY_NORMAL
notify2.init("QTileWM")
BUTTON_LEFT = 1
BUTTON_MIDDLE = 2
BUTTON_RIGHT = 3
BUTTON_SCROLL_UP = 4
BUTTON_SCROLL_DOWN = 5
def is_running(process):
s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
for x in s.stdout:
if re.search(process, x.decode('utf-8')):
return True
return False
def execute(process):
return subprocess.Popen(process.split())
def execute_once(process):
if not is_running(process):
return subprocess.Popen(process.split())
def get_screen_count():
try:
output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
output = [x for x in output.split("\n") if " connected" in x]
except subprocess.CalledProcessError:
return 1
if output:
return len(output)
else:
return 1
def bar_separator(config):
return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'),
linewidth=config.get('width_spacer', 1),
padding=config.get('padding_spacer', 4),
)
def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
if image is not None:
notification = Notification(
summary=title, message=content,
icon=image
)
else:
notification = Notification(
summary=title, message=content
)
notification.set_timeout(timeout)
notification.set_urgency(urgency)
return notification.show()
def spawn_popup(qtile, x, y, text):
"""
:param qtile: The main qtile instance
:type qtile: Qtile
:param x: x-coordinate
:type x: int
:param y: y-coordinate
:type y: int
:param text: String to display
:type text: str
:return: The popup instance
:rtype: Internal
"""
popup = Internal.create(
qtile, x, y, 100, 100, opacity=1
)
# Create textwidget for in window
popup.bordercolor = "#000000"
popup.borderwidth = 1
popup.focus(False)
#popup.
return popup
def despawn_popup(popup):
"""
:type popup: Internal
:param popup: The popup to despawn
"""
popup.kill()
def test_popups(qtile):
popup = spawn_popup(qtile, 10, 10, "Hello World!")
sleep(3)
despawn_popup(popup)
def display_wm_class(qtile):
window = qtile.currentWindow if qtile else None
if window:
wm_class = window.window.get_wm_class() or None
name = window.name
if wm_class:
notify(title="WM_Class of {}".format(name),
content="{}".format(wm_class),
urgency=notify2.URGENCY_CRITICAL)
def bluetooth_audio_sink():
try:
output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8")
output = [x for x in output.split('\n') if "blue" in x.lower()]
except subprocess.CalledProcessError:
return -1
sink = -1
try:
sink = int(output[0].split()[0])
except IndexError:
pass
except AttributeError:
pass
except ValueError:
pass
return sink
def bluetooth_audio_connected():
return bluetooth_audio_sink() != -1
class KuroTopBar(Bar):
def __init__(self, theme, widgets, size, **config):
self.theme = theme
super(KuroTopBar, self).__init__(widgets, size, **config)
def _configure(self, qtile, screen):
super(KuroTopBar, self)._configure(qtile, screen)
self.window.handle_EnterNotify = self.handle_enter_notify
self.window.handle_LeaveNotify = self.handle_leave_notify
def handle_enter_notify(self, e):
# self.theme.log_debug("Bar HandleEnterNotify")
#
# self.window.opacity = Config.get('bar_hover_opacity', 1.0)
# print("Bar Hover Enter")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_enter"):
# hovered_widget.handle_hover_enter(e)
self.draw()
def handle_leave_notify(self, e):
# self.theme.log_debug("Bar HandleLeaveNotify")
#
# self.window.opacity = Config.get('bar_opacity', 1.0)
# print("Bar Hover Leave")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_leave"):
# hovered_widget.handle_hover_leave(e)
self.draw()
class AppLauncherIcon(Image):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'")
def handle_hover(self, event):
spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!")
class CheckUpdatesYaourt(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYaourt, self).__init__(**config)
# Override command and output with yaourt command
self.cmd = "yaourt -Qua".split()
self.status_cmd = "yaourt -Qua".split()
self.update_cmd = "sudo yaourt -Sya".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYaourt, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class KuroBatteryIcon(BatteryIcon):
status_cmd = "acpi"
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify(
"Battery Status",
output
)
class PulseVolumeWidget(Volume):
defaults = [
("cardid", None, "Card Id"),
("device", "default", "Device Name"),
("channel", "Master", "Channel"),
("padding", 3, "Padding left and right. Calculated if None."),
("theme_path", None, "Path of the icons"),
("update_interval", 0.2, "Update time in seconds."),
("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set."
"The specified font needs to contain the correct unicode characters."),
("mute_command", None, "Mute command"),
("volume_up_command", None, "Volume up command"),
("volume_down_command", None, "Volume down command"),
("get_volume_command", None, "Command to get the current volume"),
("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"),
]
_old_length = 0
def __init__(self, **config):
super(PulseVolumeWidget, self).__init__(**config)
self._old_length = self._length
# Augment commands with bluetooth sink ID if this is a bluetooth icon
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
elif self.is_bluetooth_icon:
self.commands_need_reset = True
else:
self.commands_need_reset = False
self._old_length = self._length
def reset_bluetooth_commands(self):
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
def get_volume(self):
try:
get_volume_cmd = "echo 0".split()
if self.get_volume_command:
if self.is_bluetooth_icon and bluetooth_audio_sink() == -1:
pass
else:
get_volume_cmd = self.get_volume_command
mixer_out = self.call_process(get_volume_cmd)
except subprocess.CalledProcessError:
return -1
try:
return int(mixer_out.strip())
except ValueError:
return -1
def _update_drawer(self):
super(PulseVolumeWidget, self)._update_drawer()
self.text = ""
if self.is_bluetooth_icon and not bluetooth_audio_connected():
self._length = 0
def draw(self):
if self.is_bluetooth_icon and not bluetooth_audio_connected():
if not self.commands_need_reset:
logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer")
self.commands_need_reset = True
base._TextBox.draw(self)
else:
if self.commands_need_reset:
self.reset_bluetooth_commands()
if self.theme_path:
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
base._TextBox.draw(self)
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
volume = self.get_volume()
width = 15
if volume >= 0:
volume_amount = round((volume/100)*width)
else:
volume_amount = 0
msg = "[{}{}]".format(
"".join(["#" for x in range(volume_amount)]),
"".join(["-" for x in range(width-volume_amount)])
)
notify(
"{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume),
msg
)
else:
super(PulseVolumeWidget, self).button_press(x, y, button)
class WifiIconWidget(base._TextBox):
"""WiFi connection strength indicator widget."""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('interface', 'wlan0', 'The interface to monitor'),
('update_interval', 1, 'The update interval.'),
('theme_path', default_icon_path(), 'Path of the icons'),
('custom_icons', {}, 'dict containing key->filename icon map'),
]
def __init__(self, **config):
super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config)
self.add_defaults(WifiIconWidget.defaults)
if self.theme_path:
self.length_type = bar.STATIC
self.length = 0
self.surfaces = {}
self.current_icon = 'wireless-disconnected'
self.icons = dict([(x, '{0}.png'.format(x)) for x in (
'wireless-disconnected',
'wireless-none',
'wireless-low',
'wireless-medium',
'wireless-high',
'wireless-full',
)])
self.icons.update(self.custom_icons)
def _get_info(self):
try:
essid, quality = get_status(self.interface)
disconnected = essid is None
if disconnected:
return self.disconnected_message
return {
'error': False,
'essid': essid,
'quality': quality,
'percent': (quality / 70)
}
except EnvironmentError:
logger.error(
'%s: Probably your wlan device is switched off or '
' otherwise not present in your system.',
self.__class__.__name__)
return {'error': True}
def timer_setup(self):
self.update()
self.timeout_add(self.update_interval, self.timer_setup)
def _configure(self, qtile, bar):
super(WifiIconWidget, self)._configure(qtile, bar)
self.setup_images()
def _get_icon_key(self):
key = 'wireless'
info = self._get_info()
if info is False or info.get('error'):
key += '-none'
elif info.get('essid') is None:
key += '-disconnected'
else:
percent = info['percent']
if percent < 0.2:
key += '-low'
elif percent < 0.4:
key += '-medium'
elif percent < 0.8:
key += '-high'
else:
key += '-full'
return key
def update(self):
icon = self._get_icon_key()
if icon != self.current_icon:
self.current_icon = icon
self.draw()
def draw(self):
if self.theme_path:
self.drawer.clear(self.background or self.bar.background)
self.drawer.ctx.set_source(self.surfaces[self.current_icon])
self.drawer.ctx.paint()
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
self.text = self.current_icon[8:]
base._TextBox.draw(self)
def setup_images(self):
for key, name in self.icons.items():
try:
path = os.path.join(self.theme_path, name)
img = cairocffi.ImageSurface.create_from_png(path)
except cairocffi.Error:
self.theme_path = None
logger.warning('Wireless Icon switching to text mode')
return
input_width = img.get_width()
input_height = img.get_height()
sp = input_height / (self.bar.height - 1)
width = input_width / sp
if width > self.length:
# cast to `int` only after handling all potentially-float values
self.length = int(width + self.actual_padding * 2)
imgpat = cairocffi.SurfacePattern(img)
scaler = cairocffi.Matrix()
scaler.scale(sp, sp)
scaler.translate(self.actual_padding * -1, 0)
imgpat.set_matrix(scaler)
imgpat.set_filter(cairocffi.FILTER_BEST)
self.surfaces[key] = imgpat
class ThermalSensorWidget(ThermalSensor):
defaults = [
('metric', True, 'True to use metric/C, False to use imperial/F'),
('show_tag', False, 'Show tag sensor'),
('update_interval', 2, 'Update interval in seconds'),
('tag_sensor', None,
'Tag of the temperature sensor. For example: "temp1" or "Core 0"'),
('chip', None, 'Chip argument for sensors command'),
(
'threshold',
70,
'If the current temperature value is above, '
'then change to foreground_alert colour'
),
('foreground_alert', 'ff0000', 'Foreground colour alert'),
]
@catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError)
def get_temp_sensors(self):
"""calls the unix `sensors` command with `-f` flag if user has specified that
the output should be read in Fahrenheit.
"""
command = ["sensors", ]
if self.chip:
command.append(self.chip)
if not self.metric:
command.append("-f")
sensors_out = self.call_process(command)
return self._format_sensors_output(sensors_out)