kuro-qtile-theme/kuro/utils/general.py

821 lines
27 KiB
Python

import os
import re
import subprocess
from asyncio import Queue
from threading import Thread
from time import sleep
import cairocffi
import notify2
import numpy
import pyaudio
import six
from libqtile import widget, bar
from libqtile.widget.currentlayout import CurrentLayoutIcon
from libqtile.widget.graph import _Graph
from libqtile.window import Internal
from libqtile.bar import Bar
from libqtile.utils import catch_exception_and_warn, UnixCommandNotFound
from libqtile.widget import base
from libqtile.widget.battery import default_icon_path
from libqtile.widget.check_updates import CheckUpdates
from libqtile.widget.image import Image
from libqtile.widget.sensors import ThermalSensor
from libqtile.widget.volume import Volume
from libqtile.widget.battery import BatteryIcon
from libqtile.widget.wlan import get_status
from libqtile.log_utils import logger
from notify2 import Notification, URGENCY_NORMAL
notify2.init("QTileWM")
BUTTON_LEFT = 1
BUTTON_MIDDLE = 2
BUTTON_RIGHT = 3
BUTTON_SCROLL_UP = 4
BUTTON_SCROLL_DOWN = 5
def is_running(process):
s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
for x in s.stdout:
if re.search(process, x.decode('utf-8')):
return True
return False
def execute(process):
if isinstance(process, list):
return subprocess.Popen(process)
elif isinstance(process, str):
return subprocess.Popen(process.split())
else:
pass
def execute_once(process):
if not is_running(process):
if isinstance(process, list):
return subprocess.Popen(process)
elif isinstance(process, str):
return subprocess.Popen(process.split())
else:
pass
def call_process(command, **kwargs):
"""
This method uses `subprocess.check_output` to run the given command
and return the string from stdout, which is decoded when using
Python 3.
"""
output = subprocess.check_output(command, **kwargs)
if six.PY3:
output = output.decode()
return output
def get_screen_count():
try:
output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
output = [x for x in output.split("\n") if " connected" in x]
except subprocess.CalledProcessError:
return 1
if output:
return len(output)
else:
return 1
def bar_separator(config):
return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'),
linewidth=config.get('width_spacer', 1),
padding=config.get('padding_spacer', 4),
)
def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
if image is not None:
notification = Notification(
summary=title, message=content,
icon=image
)
else:
notification = Notification(
summary=title, message=content
)
notification.set_timeout(timeout)
notification.set_urgency(urgency)
return notification.show()
def spawn_popup(qtile, x, y, text):
"""
:param qtile: The main qtile instance
:type qtile: Qtile
:param x: x-coordinate
:type x: int
:param y: y-coordinate
:type y: int
:param text: String to display
:type text: str
:return: The popup instance
:rtype: Internal
"""
popup = Internal.create(
qtile, x, y, 100, 100, opacity=1
)
# Create textwidget for in window
popup.bordercolor = "#000000"
popup.borderwidth = 1
popup.focus(False)
#popup.
return popup
def despawn_popup(popup):
"""
:type popup: Internal
:param popup: The popup to despawn
"""
popup.kill()
def test_popups(qtile):
popup = spawn_popup(qtile, 10, 10, "Hello World!")
sleep(3)
despawn_popup(popup)
def display_wm_class(qtile):
window = qtile.currentWindow if qtile else None
if window:
wm_class = window.window.get_wm_class() or None
name = window.name
if wm_class:
notify(title="WM_Class of {}".format(name),
content="{}".format(wm_class),
urgency=notify2.URGENCY_CRITICAL)
def bluetooth_audio_sink():
try:
output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8")
output = [x for x in output.split('\n') if "blue" in x.lower()]
except subprocess.CalledProcessError:
return -1
sink = -1
try:
sink = int(output[0].split()[0])
except IndexError:
pass
except AttributeError:
pass
except ValueError:
pass
return sink
def bluetooth_audio_connected():
return bluetooth_audio_sink() != -1
class KuroTopBar(Bar):
def __init__(self, theme, widgets, size, **config):
self.theme = theme
super(KuroTopBar, self).__init__(widgets, size, **config)
def _configure(self, qtile, screen):
super(KuroTopBar, self)._configure(qtile, screen)
self.window.handle_EnterNotify = self.handle_enter_notify
self.window.handle_LeaveNotify = self.handle_leave_notify
def handle_enter_notify(self, e):
# self.theme.log_debug("Bar HandleEnterNotify")
#
# self.window.opacity = Config.get('bar_hover_opacity', 1.0)
# print("Bar Hover Enter")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_enter"):
# hovered_widget.handle_hover_enter(e)
self.draw()
def handle_leave_notify(self, e):
# self.theme.log_debug("Bar HandleLeaveNotify")
#
# self.window.opacity = Config.get('bar_opacity', 1.0)
# print("Bar Hover Leave")
#
# try:
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
# except IndexError:
# hovered_widget = None
#
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
#
# if hasattr(hovered_widget, "handle_hover_leave"):
# hovered_widget.handle_hover_leave(e)
self.draw()
class AppLauncherIcon(Image):
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
execute("dmenu_run -i -p '»' -nb '#000000' -fn 'Noto Sans-11' -nf '#777777' -sb '#1793d0' -sf '#ffffff'")
def handle_hover(self, event):
spawn_popup(self.qtile, self.offsetx, self.offsety, "Hovered over AppLauncherIcon!")
class CheckUpdatesYay(CheckUpdates):
def __init__(self, **config):
super(CheckUpdatesYay, self).__init__(**config)
# Override command and output with yay command
self.cmd = "yay -Qua".split()
self.status_cmd = "yay -Qua --color never".split()
self.update_cmd = "sudo yay".split()
self.subtr = 0
def _check_updates(self):
#subprocess.check_output(self.update_cmd)
res = super(CheckUpdatesYay, self)._check_updates()
return res
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8').split('\n')
num_updates = len(output)-1
msg = "{} updates available.".format(num_updates)
if num_updates > 0:
msg += "\n\n"
for x in range(min(num_updates, 9)):
msg += output[x] + "\n"
if num_updates > 9:
msg += "and {} more...".format(num_updates-9)
notify(
"System updates",
msg
)
elif button == BUTTON_MIDDLE and self.execute is not None:
subprocess.Popen(self.execute, shell=True)
class KuroBatteryIcon(BatteryIcon):
status_cmd = "acpi"
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
output = subprocess.check_output(self.status_cmd).decode('utf-8')
notify(
"Battery Status",
output
)
class PulseVolumeWidget(Volume):
defaults = [
("cardid", None, "Card Id"),
("device", "default", "Device Name"),
("channel", "Master", "Channel"),
("padding", 3, "Padding left and right. Calculated if None."),
("theme_path", None, "Path of the icons"),
("update_interval", 0.2, "Update time in seconds."),
("emoji", False, "Use emoji to display volume states, only if ``theme_path`` is not set."
"The specified font needs to contain the correct unicode characters."),
("mute_command", None, "Mute command"),
("volume_up_command", None, "Volume up command"),
("volume_down_command", None, "Volume down command"),
("get_volume_command", None, "Command to get the current volume"),
("is_bluetooth_icon", False, "Is this icon for a Bluetooth Audio device?"),
]
_old_length = 0
def __init__(self, **config):
super(PulseVolumeWidget, self).__init__(**config)
self._old_length = self._length
# Augment commands with bluetooth sink ID if this is a bluetooth icon
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
elif self.is_bluetooth_icon:
self.commands_need_reset = True
else:
self.commands_need_reset = False
self._old_length = self._length
def reset_bluetooth_commands(self):
if self.is_bluetooth_icon and bluetooth_audio_connected():
bsink = 0 if bluetooth_audio_sink() == -1 else bluetooth_audio_sink()
self.mute_command = " ".join(self._user_config['mute_command']).format(bsink=bsink).split()
self.volume_up_command = " ".join(self._user_config['volume_up_command']).format(bsink=bsink).split()
self.volume_down_command = " ".join(self._user_config['volume_down_command']).format(bsink=bsink).split()
self.get_volume_command = " ".join(self._user_config['get_volume_command']).format(bsink=bsink).split()
logger.info("Updated bluetooth commands with bluetooth sink {}".format(bsink))
self._length = self._old_length
self.commands_need_reset = False
def get_volume(self):
try:
get_volume_cmd = "echo 0".split()
if self.get_volume_command:
if self.is_bluetooth_icon and bluetooth_audio_sink() == -1:
pass
else:
get_volume_cmd = self.get_volume_command
mixer_out = self.call_process(get_volume_cmd)
except subprocess.CalledProcessError:
return -1
try:
return int(mixer_out.strip())
except ValueError:
return -1
def _update_drawer(self):
super(PulseVolumeWidget, self)._update_drawer()
self.text = ""
if self.is_bluetooth_icon and not bluetooth_audio_connected():
self._length = 0
def draw(self):
if self.is_bluetooth_icon and not bluetooth_audio_connected():
if not self.commands_need_reset:
logger.info("Bluetooth device disconnected. Hiding bluetooth audio mixer")
self.commands_need_reset = True
base._TextBox.draw(self)
else:
if self.commands_need_reset:
self.reset_bluetooth_commands()
if self.theme_path:
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
base._TextBox.draw(self)
def button_press(self, x, y, button):
if button == BUTTON_LEFT:
volume = self.get_volume()
width = 15
if volume >= 0:
volume_amount = round((volume/100)*width)
else:
volume_amount = 0
msg = "[{}{}]".format(
"".join(["#" for x in range(volume_amount)]),
"".join(["-" for x in range(width-volume_amount)])
)
notify(
"{}Volume : {}%".format("Bluetooth " if self.is_bluetooth_icon else "", volume),
msg
)
else:
super(PulseVolumeWidget, self).button_press(x, y, button)
class WifiIconWidget(base._TextBox):
"""WiFi connection strength indicator widget."""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('interface', 'wlan0', 'The interface to monitor'),
('update_interval', 1, 'The update interval.'),
('theme_path', default_icon_path(), 'Path of the icons'),
('custom_icons', {}, 'dict containing key->filename icon map'),
]
def __init__(self, **config):
super(WifiIconWidget, self).__init__("WLAN", bar.CALCULATED, **config)
self.add_defaults(WifiIconWidget.defaults)
if self.theme_path:
self.length_type = bar.STATIC
self.length = 0
self.surfaces = {}
self.current_icon = 'wireless-disconnected'
self.icons = dict([(x, '{0}.png'.format(x)) for x in (
'wireless-disconnected',
'wireless-none',
'wireless-low',
'wireless-medium',
'wireless-high',
'wireless-full',
)])
self.icons.update(self.custom_icons)
def _get_info(self):
try:
essid, quality = get_status(self.interface)
disconnected = essid is None
if disconnected:
return self.disconnected_message
return {
'error': False,
'essid': essid,
'quality': quality,
'percent': (quality / 70)
}
except EnvironmentError:
logger.error(
'%s: Probably your wlan device is switched off or '
' otherwise not present in your system.',
self.__class__.__name__)
return {'error': True}
def timer_setup(self):
self.update()
self.timeout_add(self.update_interval, self.timer_setup)
def _configure(self, qtile, bar):
super(WifiIconWidget, self)._configure(qtile, bar)
self.setup_images()
def _get_icon_key(self):
key = 'wireless'
info = self._get_info()
if info is False or info.get('error'):
key += '-none'
elif info.get('essid') is None:
key += '-disconnected'
else:
percent = info['percent']
if percent < 0.2:
key += '-low'
elif percent < 0.4:
key += '-medium'
elif percent < 0.8:
key += '-high'
else:
key += '-full'
return key
def update(self):
icon = self._get_icon_key()
if icon != self.current_icon:
self.current_icon = icon
self.draw()
def draw(self):
if self.theme_path:
self.drawer.clear(self.background or self.bar.background)
self.drawer.ctx.set_source(self.surfaces[self.current_icon])
self.drawer.ctx.paint()
self.drawer.draw(offsetx=self.offset, width=self.length)
else:
self.text = self.current_icon[8:]
base._TextBox.draw(self)
def setup_images(self):
for key, name in self.icons.items():
try:
path = os.path.join(self.theme_path, name)
img = cairocffi.ImageSurface.create_from_png(path)
except cairocffi.Error:
self.theme_path = None
logger.warning('Wireless Icon switching to text mode')
return
input_width = img.get_width()
input_height = img.get_height()
sp = input_height / (self.bar.height - 1)
width = input_width / sp
if width > self.length:
# cast to `int` only after handling all potentially-float values
self.length = int(width + self.actual_padding * 2)
imgpat = cairocffi.SurfacePattern(img)
scaler = cairocffi.Matrix()
scaler.scale(sp, sp)
scaler.translate(self.actual_padding * -1, 0)
imgpat.set_matrix(scaler)
imgpat.set_filter(cairocffi.FILTER_BEST)
self.surfaces[key] = imgpat
class ThermalSensorWidget(ThermalSensor):
defaults = [
('metric', True, 'True to use metric/C, False to use imperial/F'),
('show_tag', False, 'Show tag sensor'),
('update_interval', 2, 'Update interval in seconds'),
('tag_sensor', None,
'Tag of the temperature sensor. For example: "temp1" or "Core 0"'),
('chip', None, 'Chip argument for sensors command'),
(
'threshold',
70,
'If the current temperature value is above, '
'then change to foreground_alert colour'
),
('foreground_alert', 'ff0000', 'Foreground colour alert'),
]
@catch_exception_and_warn(warning=UnixCommandNotFound, excepts=OSError)
def get_temp_sensors(self):
"""calls the unix `sensors` command with `-f` flag if user has specified that
the output should be read in Fahrenheit.
"""
command = ["sensors", ]
if self.chip:
command.append(self.chip)
if not self.metric:
command.append("-f")
sensors_out = self.call_process(command)
return self._format_sensors_output(sensors_out)
class SeparatorWidget(base._TextBox):
def __init__(self):
super(SeparatorWidget, self).__init__(text="|", width=bar.CALCULATED, fontsize=14)
class MediaWidget(base.InLoopPollText):
"""Media Status Widget"""
class Status:
OFFLINE = 0
PLAYING = 1
PAUSED = 2
STOPPED = 3
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
('off_text', '', 'The pattern for the text if no players are found.'),
('on_text_play', '{}', 'The pattern for the text if music is playing.'),
('on_text_pause', '{}', 'The pattern for the text if music is paused.'),
('on_text_stop', '{}', 'The pattern for the text if music is stopped.'),
('update_interval', 1, 'The update interval.'),
]
player_icons = {
'spotify': '',
'vlc': '',
'firefox': '',
}
custom_player_data = {
'firefox': {
'showing': False,
'title': '',
'state': Status.STOPPED,
}
}
def __init__(self, **config):
super(MediaWidget, self).__init__(**config)
self.add_defaults(MediaWidget.defaults)
self.surfaces = {}
def cmd_update_custom_player(self, player_name, data):
# Update firefox player
if player_name == "firefox":
if data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PAUSED
self.custom_player_data['firefox']['title'] = data['title']
elif data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.PLAYING
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and data['muted']:
self.custom_player_data['firefox']['showing'] = True
self.custom_player_data['firefox']['state'] = MediaWidget.Status.STOPPED
self.custom_player_data['firefox']['title'] = data['title']
elif not data['playing'] and not data['muted']:
self.custom_player_data['firefox']['showing'] = False
self.custom_player_data['firefox']['state'] = MediaWidget.Status.OFFLINE
self.custom_player_data['firefox']['title'] = data['title']
def _get_players(self):
players = []
# Playerctl players
command = ["playerctl", "-l"]
result = self.call_process(command)
if result:
players.extend([x for x in result.split("\n") if x])
# Custom players - Firefox
if self.custom_player_data['firefox']['showing']:
players.append('firefox')
if players:
return players
else:
return None
def _get_info(self):
players = self._get_players()
if not players:
return {}
else:
result = {}
for player in players:
if player in self.custom_player_data.keys():
# Custom player -- Firefox
if player == "firefox":
result[player] = [self.custom_player_data['firefox']['state'], self.custom_player_data['firefox']['title']]
# Other custom players -- generic attempt with error catching
else:
try:
result[player] = [self.custom_player_data[player]['state'],
self.custom_player_data[player]['title']]
except KeyError:
pass
else:
# PlayerCtl player
command = ["playerctl", "-p", player, "status"]
cmd_result = self.call_process(command).strip()
text = "Unknown"
if cmd_result in ["Playing", "Paused"]:
artist = self.call_process(['playerctl', '-p', player, 'metadata', 'artist']).strip()
title = self.call_process(['playerctl', '-p', player, 'metadata', 'title']).strip()
if artist and title:
text = "{} - {}".format(artist, title)
elif artist:
text = artist
elif title:
text = title
if cmd_result == "Playing":
result[player] = [MediaWidget.Status.PLAYING, text]
elif cmd_result == "Paused":
result[player] = [MediaWidget.Status.PAUSED, text]
elif cmd_result == "Stopped":
result[player] = [MediaWidget.Status.STOPPED, ""]
return result
def _get_formatted_text(self, status):
if status[0] == MediaWidget.Status.PLAYING:
return self.on_text_play.format(status[1])
elif status[0] == MediaWidget.Status.PAUSED:
return self.on_text_pause.format(status[1])
elif status[0] == MediaWidget.Status.STOPPED:
return self.on_text_stop.format(status[1])
else:
return "Unknown"
def poll(self):
text = []
status = self._get_info()
if not status:
return self.off_text
else:
for player in status.keys():
icon = self.player_icons.get(player, player)
logger.warning([player, status[player]])
text.append("{} {}".format(icon, self._get_formatted_text(status[player])))
return " | ".join(text) if text else self.off_text
class AudioVisualizerWidget(_Graph):
"""Display Audio Visualization graph"""
orientations = base.ORIENTATION_HORIZONTAL
defaults = [
("audio_channel", "default", "Which audio channel to show"),
]
stream = None
fixed_upper_bound = True
def __init__(self, **config):
_Graph.__init__(self, **config)
self.add_defaults(AudioVisualizerWidget.defaults)
self.maxvalue = 100
self.samples = 1024
self.max_observed = 1
# initialize communication queue
self.q = Queue()
self.t = None
self.stream = None
self.tries = 0
def initialize_stream(self):
# initialize portaudio
p = pyaudio.PyAudio()
try:
self.stream = p.open(format=pyaudio.paInt16, channels=1, rate=44100, input=True, frames_per_buffer=self.samples)
# initialize thread
self.t = Thread(target=self.process, args=[self, self.q])
self.t.start()
except OSError as e:
logger.warning("Could not open audio stream: ".format(e))
self.tries += 1
@staticmethod
def process(widget: 'AudioVisualizerWidget', queue: Queue):
item = queue.get()
if widget.max_observed > 100:
widget.max_observed -= 100
# Discard all available frames
avail = widget.stream.get_read_available()
while avail > 1000:
_ = widget.stream.read(avail)
logger.debug("Discarded {} frames".format(avail))
avail = widget.stream.get_read_available()
if avail > 100:
data = widget.stream.read(widget.samples)
numpydata = numpy.abs(numpy.fromstring(data, dtype=numpy.int16))
if numpy.max(numpydata) > widget.max_observed:
widget.max_observed = numpy.max(numpydata)
numpydata = numpydata * (100 / widget.max_observed)
numpydata = AudioVisualizerWidget.window_rms(numpydata, 25)
widget.values = list(numpydata)
print(widget.values)
else:
widget.values = [0]*1024
@staticmethod
def window_rms(a, window_size):
a2 = numpy.power(a, 2)
window = numpy.ones(window_size) / float(window_size)
return numpy.sqrt(numpy.convolve(a2, window, 'valid'))
def update_graph(self):
if not self.stream and self.tries < 10:
self.initialize_stream()
else:
if self.q.empty():
self.q.put(True)
self.draw()
class KuroCurrentLayoutIcon(CurrentLayoutIcon):
def _get_layout_names(self):
names = super(KuroCurrentLayoutIcon, self)._get_layout_names()
from kuro.utils import layouts as kuro_layouts
from libqtile.layout.base import Layout
klayouts = [
layout_class_name.lower()
for layout_class, layout_class_name
in map(lambda x: (getattr(kuro_layouts, x), x), dir(kuro_layouts))
if isinstance(layout_class, six.class_types) and issubclass(layout_class, Layout)
]
names.extend(klayouts)
return list(set(names))