265 lines
7 KiB
Python
265 lines
7 KiB
Python
import re
|
|
import subprocess
|
|
import traceback
|
|
from time import sleep
|
|
|
|
import notify2
|
|
import six
|
|
from dbus import DBusException
|
|
from libqtile import widget
|
|
from libqtile.backend.x11.window import Internal
|
|
from libqtile.bar import Bar
|
|
from notify2 import Notification, URGENCY_NORMAL
|
|
from libqtile.log_utils import logger
|
|
|
|
try:
|
|
notify2.init("QTileWM")
|
|
except DBusException as e:
|
|
logger.error("Could not initialize notify2: {}".format(e))
|
|
|
|
BUTTON_LEFT = 1
|
|
BUTTON_MIDDLE = 2
|
|
BUTTON_RIGHT = 3
|
|
BUTTON_UP = 4
|
|
BUTTON_DOWN = 5
|
|
BUTTON_MUTE = 1
|
|
BUTTON_SCROLL_UP = 4
|
|
BUTTON_SCROLL_DOWN = 5
|
|
|
|
|
|
def is_running(process):
|
|
s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
|
|
if isinstance(process, list):
|
|
process = "".join(process)
|
|
for x in s.stdout:
|
|
if re.search(process, x.decode('utf-8')):
|
|
return True
|
|
return False
|
|
|
|
|
|
def execute(process):
|
|
if isinstance(process, list):
|
|
return subprocess.Popen(process)
|
|
elif isinstance(process, str):
|
|
return subprocess.Popen(process.split())
|
|
else:
|
|
pass
|
|
|
|
|
|
def execute_once(process):
|
|
if not is_running(process):
|
|
if isinstance(process, list):
|
|
return subprocess.Popen(process)
|
|
elif isinstance(process, str):
|
|
return subprocess.Popen(process.split())
|
|
else:
|
|
pass
|
|
|
|
|
|
def call_process(command, **kwargs):
|
|
"""
|
|
This method uses `subprocess.check_output` to run the given command
|
|
and return the string from stdout, which is decoded when using
|
|
Python 3.
|
|
"""
|
|
output = subprocess.check_output(command, **kwargs)
|
|
if six.PY3:
|
|
output = output.decode()
|
|
return output
|
|
|
|
|
|
def get_screen_count():
|
|
try:
|
|
output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
|
|
output = [x for x in output.split("\n") if " connected" in x]
|
|
except subprocess.CalledProcessError:
|
|
return 1
|
|
|
|
if output:
|
|
return len(output)
|
|
else:
|
|
return 1
|
|
|
|
|
|
def bar_separator(config):
|
|
return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'),
|
|
linewidth=config.get('width_spacer', 1),
|
|
padding=config.get('padding_spacer', 4),
|
|
)
|
|
|
|
|
|
def notify(title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
|
|
if image is not None:
|
|
notification = Notification(
|
|
summary=title, message=content,
|
|
icon=image
|
|
)
|
|
else:
|
|
notification = Notification(
|
|
summary=title, message=content
|
|
)
|
|
notification.set_timeout(timeout)
|
|
notification.set_urgency(urgency)
|
|
|
|
try:
|
|
return notification.show()
|
|
except notify2.UninittedError:
|
|
logger.warning("Notify2 was uninitialized, initializing...")
|
|
notify2.init("qtile")
|
|
return notification.show()
|
|
except DBusException as e:
|
|
logger.warning("Showing notification failed: {}".format(e))
|
|
logger.warning(traceback.format_exc())
|
|
|
|
|
|
def spawn_popup(qtile, x, y, text):
|
|
"""
|
|
:param qtile: The main qtile instance
|
|
:type qtile: Qtile
|
|
:param x: x-coordinate
|
|
:type x: int
|
|
:param y: y-coordinate
|
|
:type y: int
|
|
:param text: String to display
|
|
:type text: str
|
|
:return: The popup instance
|
|
:rtype: Internal
|
|
"""
|
|
popup = Internal.create(
|
|
qtile, x, y, 100, 100, opacity=1
|
|
)
|
|
|
|
# Create textwidget for in window
|
|
popup.bordercolor = "#000000"
|
|
popup.borderwidth = 1
|
|
|
|
popup.focus(False)
|
|
|
|
#popup.
|
|
|
|
return popup
|
|
|
|
|
|
def despawn_popup(popup):
|
|
"""
|
|
:type popup: Internal
|
|
:param popup: The popup to despawn
|
|
"""
|
|
popup.kill()
|
|
|
|
|
|
def test_popups(qtile):
|
|
popup = spawn_popup(qtile, 10, 10, "Hello World!")
|
|
sleep(3)
|
|
despawn_popup(popup)
|
|
|
|
|
|
def display_wm_class(qtile):
|
|
window = qtile.currentWindow if qtile else None
|
|
|
|
if window:
|
|
wm_class = window.window.get_wm_class() or None
|
|
name = window.name
|
|
|
|
if wm_class:
|
|
notify(title="WM_Class of {}".format(name),
|
|
content="{}".format(wm_class),
|
|
urgency=notify2.URGENCY_CRITICAL)
|
|
|
|
|
|
def bluetooth_audio_sink():
|
|
try:
|
|
output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8")
|
|
output = [x for x in output.split('\n') if "blue" in x.lower()]
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
return -1
|
|
|
|
sink = -1
|
|
try:
|
|
sink = int(output[0].split()[0])
|
|
except IndexError:
|
|
pass
|
|
except AttributeError:
|
|
pass
|
|
except ValueError:
|
|
pass
|
|
|
|
return sink
|
|
|
|
|
|
def bluetooth_audio_connected():
|
|
return bluetooth_audio_sink() != -1
|
|
|
|
|
|
class KuroTopBar(Bar):
|
|
def __init__(self, theme, widgets, size, **config):
|
|
self.theme = theme
|
|
super(KuroTopBar, self).__init__(widgets, size, **config)
|
|
|
|
def _configure(self, qtile, screen, *args, **kwargs):
|
|
super(KuroTopBar, self)._configure(qtile, screen)
|
|
self.window.handle_EnterNotify = self.handle_enter_notify
|
|
self.window.handle_LeaveNotify = self.handle_leave_notify
|
|
self.window.window.set_property("_NET_WM_NAME", "KuroTopBar")
|
|
self.window.update_name()
|
|
|
|
def draw(self):
|
|
if not self.widgets:
|
|
return
|
|
if not self._draw_queued:
|
|
self.future = self.qtile.call_soon(self._actual_draw)
|
|
self._draw_queued = True
|
|
|
|
def _actual_draw(self):
|
|
self._draw_queued = False
|
|
self._resize(self._length, self.widgets)
|
|
for i in self.widgets:
|
|
i.draw()
|
|
if self.widgets:
|
|
end = i.offset + i.length
|
|
if end < self._length:
|
|
if self.horizontal:
|
|
self.drawer.draw(offsetx=end, width=self._length - end)
|
|
else:
|
|
self.drawer.draw(offsety=end, height=self._length - end)
|
|
|
|
self.theme.update_visualizers()
|
|
|
|
def handle_enter_notify(self, e):
|
|
# self.theme.log_debug("Bar HandleEnterNotify")
|
|
#
|
|
# self.window.opacity = Config.get('bar_hover_opacity', 1.0)
|
|
# print("Bar Hover Enter")
|
|
#
|
|
# try:
|
|
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
|
|
# except IndexError:
|
|
# hovered_widget = None
|
|
#
|
|
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
|
|
#
|
|
# if hasattr(hovered_widget, "handle_hover_enter"):
|
|
# hovered_widget.handle_hover_enter(e)
|
|
|
|
self.draw()
|
|
|
|
def handle_leave_notify(self, e):
|
|
# self.theme.log_debug("Bar HandleLeaveNotify")
|
|
#
|
|
# self.window.opacity = Config.get('bar_opacity', 1.0)
|
|
# print("Bar Hover Leave")
|
|
#
|
|
# try:
|
|
# hovered_widget = [x for x in self.widgets if (x.offsetx + x.width) >= e.event_x][0]
|
|
# except IndexError:
|
|
# hovered_widget = None
|
|
#
|
|
# self.theme.log_debug("Hovered over {}".format(hovered_widget))
|
|
#
|
|
# if hasattr(hovered_widget, "handle_hover_leave"):
|
|
# hovered_widget.handle_hover_leave(e)
|
|
|
|
self.draw()
|
|
|
|
|