kuro-qtile-theme/kuro/utils.py

416 lines
14 KiB
Python

import os
import re
import subprocess
import cairocffi
import notify2
from libqtile import widget, bar
from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound
from libqtile.widget import base
from libqtile.widget.battery import default_icon_path
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.image import Image
from libqtile.widget.sensors import ThermalSensor
from libqtile.widget.volume import Volume
from libqtile.widget.wlan import get_status
from libqtile.log_utils import logger
from notify2 import Notification, URGENCY_NORMAL
notify2.init("QTileWM")
BUTTON_LEFT = 1
BUTTON_MIDDLE = 2
BUTTON_RIGHT = 3
BUTTON_SCROLL_UP = 4
BUTTON_SCROLL_DOWN = 5
def is_running(process):
s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
for x in s.stdout:
if re.search(process, x.decode('utf-8')):
return True
return False
def execute(process):
return subprocess.Popen(process.split())
def execute_once(process):
if not is_running(process):
return subprocess.Popen(process.split())
def get_screen_count():
try:
output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
output = [x for x in output.split("\n") if " connected" in x]
except subprocess.CalledProcessError:
return 1
if output:
return len(output)
else:
return 1
def bar_separator(config):
return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'),
linewidth=config.get('width_spacer', 1),
padding=config.get('padding_spacer', 4),
)
def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
if image is not None:
notification = Notification(
summary=title, message=content,
icon=image
)
else:
notification = Notification(
summary=title, message=content
)
notification.set_timeout(timeout)
notification.set_urgency(urgency)
return notification.show()
def bluetooth_audio_sink():
try:
output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8")
output = [x for x in output.split('\n') if "blue" in x.lower()]
except subprocess.CalledProcessError:
return -1
sink = -1
try:
sink = int(output[0].split()[0])
except IndexError:
pass
except AttributeError:
pass
except ValueError:
pass
return sink
def bluetooth_audio_connected():
return bluetooth_audio_sink() != -1
class AppLauncherIcon(Image):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'")
class CheckUpdatesYaourt(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYaourt, self).__init__(**config)
# Override command and output with yaourt command
self.cmd = "yaourt -Qua".split()
self.status_cmd = "yaourt -Qua".split()
self.update_cmd = "yaourt -Sy"
self.subtr = 0
def _check_updates(self):
subprocess.check_output(self.update_cmd)
super(CheckUpdatesYaourt, self)._check_updates()
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class PulseVolumeWidget(Volume):
defaults = [
("cardid", None, "Card Id"),
("device", "default", "Device Name"),
("channel", "Master", "Channel"),
("padding", 3, "Padding left and right. Calculated if None."),
("theme_path", None, "Path of the icons"),
("update_interval", 0.2, "Update time in seconds."),
("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set."
"The specified font needs to contain the correct unicode characters."),
("mute_command", None, "Mute command"),
("volume_up_command", None, "Volume up command"),
("volume_down_command", None, "Volume down command"),
("get_volume_command", None, "Command to get the current volume"),
("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"),
]
_old_length = 0
def __init__(self, **config):
super(PulseVolumeWidget, self).__init__(**config)
self._old_length = self._length
# Augment commands with bluetooth sink ID if this is a bluetooth icon
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
elif self.is_bluetooth_icon:
self.commands_need_reset = True
else:
self.commands_need_reset = False
self._old_length = self._length
def reset_bluetooth_commands(self):
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
def get_volume(self):
try:
get_volume_cmd = "echo 0".split()
if self.get_volume_command:
if self.is_bluetooth_icon and bluetooth_audio_sink() == -1:
pass
else:
get_volume_cmd = self.get_volume_command
mixer_out = self.call_process(get_volume_cmd)
except subprocess.CalledProcessError:
return -1
try:
return int(mixer_out.strip())
except ValueError:
return -1
def _update_drawer(self):
super(PulseVolumeWidget, self)._update_drawer()
self.text = ""
if self.is_bluetooth_icon and not bluetooth_audio_connected():
self._length = 0
def draw(self):
if self.is_bluetooth_icon and not bluetooth_audio_connected():
if not self.commands_need_reset:
logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer")
self.commands_need_reset = True
base._TextBox.draw(self)
else:
if self.commands_need_reset:
self.reset_bluetooth_commands()
if self.theme_path:
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
base._TextBox.draw(self)
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
volume = self.get_volume()
width = 15
if volume >= 0:
volume_amount = round((volume/100)*width)
else:
volume_amount = 0
msg = "[{}{}]".format(
"".join(["#" for x in range(volume_amount)]),
"".join(["-" for x in range(width-volume_amount)])
)
notify(
"{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume),
msg
)
else:
super(PulseVolumeWidget, self).button_press(x, y, button)
class WifiIconWidget(base._TextBox):
"""WiFi connection strength indicator widget."""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('interface', 'wlan0', 'The interface to monitor'),
('update_interval', 1, 'The update interval.'),
('theme_path', default_icon_path(), 'Path of the icons'),
('custom_icons', {}, 'dict containing key->filename icon map'),
]
def __init__(self, **config):
super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config)
self.add_defaults(WifiIconWidget.defaults)
if self.theme_path:
self.length_type = bar.STATIC
self.length = 0
self.surfaces = {}
self.current_icon = 'wireless-disconnected'
self.icons = dict([(x, '{0}.png'.format(x)) for x in (
'wireless-disconnected',
'wireless-none',
'wireless-low',
'wireless-medium',
'wireless-high',
'wireless-full',
)])
self.icons.update(self.custom_icons)
def _get_info(self):
try:
essid, quality = get_status(self.interface)
disconnected = essid is None
if disconnected:
return self.disconnected_message
return {
'error': False,
'essid': essid,
'quality': quality,
'percent': (quality / 70)
}
except EnvironmentError:
logger.error(
'%s: Probably your wlan device is switched off or '
' otherwise not present in your system.',
self.__class__.__name__)
return {'error': True}
def timer_setup(self):
self.update()
self.timeout_add(self.update_interval, self.timer_setup)
def _configure(self, qtile, bar):
super(WifiIconWidget, self)._configure(qtile, bar)
self.setup_images()
def _get_icon_key(self):
key = 'wireless'
info = self._get_info()
if info is False or info.get('error'):
key += '-none'
elif info.get('essid') is None:
key += '-disconnected'
else:
percent = info['percent']
if percent < 0.2:
key += '-low'
elif percent < 0.4:
key += '-medium'
elif percent < 0.8:
key += '-high'
else:
key += '-full'
return key
def update(self):
icon = self._get_icon_key()
if icon != self.current_icon:
self.current_icon = icon
self.draw()
def draw(self):
if self.theme_path:
self.drawer.clear(self.background or self.bar.background)
self.drawer.ctx.set_source(self.surfaces[self.current_icon])
self.drawer.ctx.paint()
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
self.text = self.current_icon[8:]
base._TextBox.draw(self)
def setup_images(self):
for key, name in self.icons.items():
try:
path = os.path.join(self.theme_path, name)
img = cairocffi.ImageSurface.create_from_png(path)
except cairocffi.Error:
self.theme_path = None
logger.warning('Wireless Icon switching to text mode')
return
input_width = img.get_width()
input_height = img.get_height()
sp = input_height / (self.bar.height - 1)
width = input_width / sp
if width > self.length:
# cast to `int` only after handling all potentially-float values
self.length = int(width + self.actual_padding * 2)
imgpat = cairocffi.SurfacePattern(img)
scaler = cairocffi.Matrix()
scaler.scale(sp, sp)
scaler.translate(self.actual_padding * -1, 0)
imgpat.set_matrix(scaler)
imgpat.set_filter(cairocffi.FILTER_BEST)
self.surfaces[key] = imgpat
class ThermalSensorWidget(ThermalSensor):
defaults = [
('metric', True, 'True to use metric/C, False to use imperial/F'),
('show_tag', False, 'Show tag sensor'),
('update_interval', 2, 'Update interval in seconds'),
('tag_sensor', None,
'Tag of the temperature sensor. For example: "temp1" or "Core 0"'),
('chip', None, 'Chip argument for sensors command'),
(
'threshold',
70,
'If the current temperature value is above, '
'then change to foreground_alert colour'
),
('foreground_alert', 'ff0000', 'Foreground colour alert'),
]
@catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError)
def get_temp_sensors(self):
"""calls the unix `sensors` command with `-f` flag if user has specified that
the output should be read in Fahrenheit.
"""
command = ["sensors", ]
if self.chip:
command.append(self.chip)
if not self.metric:
command.append("-f")
sensors_out = self.call_process(command)
return self._format_sensors_output(sensors_out)