226 lines
		
	
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			226 lines
		
	
	
	
		
			6.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
import subprocess
 | 
						|
import traceback
 | 
						|
from time import sleep
 | 
						|
from typing import List
 | 
						|
 | 
						|
import notify2
 | 
						|
from dbus import DBusException
 | 
						|
from libqtile import widget
 | 
						|
from notify2 import Notification, URGENCY_NORMAL
 | 
						|
from libqtile.log_utils import logger
 | 
						|
from libqtile import qtile
 | 
						|
 | 
						|
BUTTON_LEFT = 1
 | 
						|
BUTTON_MIDDLE = 2
 | 
						|
BUTTON_RIGHT = 3
 | 
						|
BUTTON_UP = 4
 | 
						|
BUTTON_DOWN = 5
 | 
						|
BUTTON_MUTE = 1
 | 
						|
BUTTON_SCROLL_UP = 4
 | 
						|
BUTTON_SCROLL_DOWN = 5
 | 
						|
 | 
						|
 | 
						|
def is_running(process):
 | 
						|
    s = subprocess.Popen(["ps", "axuw"], stdout=subprocess.PIPE)
 | 
						|
    if isinstance(process, list):
 | 
						|
        process = "".join(process)
 | 
						|
    for x in s.stdout:
 | 
						|
        if re.search(process, x.decode('utf-8')):
 | 
						|
            return True
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
def execute(process):
 | 
						|
    try:
 | 
						|
        if isinstance(process, list):
 | 
						|
            return subprocess.Popen(process)
 | 
						|
        elif isinstance(process, str):
 | 
						|
            return subprocess.Popen(process.split())
 | 
						|
        else:
 | 
						|
            logger.info(f"Failed to execute_once")
 | 
						|
    except FileNotFoundError as e:
 | 
						|
        logger.error(f"Could not execute {process}, FileNotFoundError - {e}")
 | 
						|
 | 
						|
 | 
						|
def execute_once(process):
 | 
						|
    logger.info(f"Attempting to execute_once: {process}")
 | 
						|
    if not is_running(process):
 | 
						|
        return execute(process)
 | 
						|
    logger.info(f"Process was already running: {process}")
 | 
						|
 | 
						|
 | 
						|
def start_in_group(theme, qtile, group: str, command: List[str], floating: bool = False,
 | 
						|
                   intrusive: bool = False, dont_break: bool = False):
 | 
						|
    try:
 | 
						|
        proc = subprocess.Popen(command)
 | 
						|
        match_args = {"net_wm_pid": proc.pid}
 | 
						|
        rule_args = {
 | 
						|
            "float": floating,
 | 
						|
            "intrusive": intrusive,
 | 
						|
            "group": group,
 | 
						|
            "break_on_match": not dont_break,
 | 
						|
        }
 | 
						|
        rule_id = qtile.add_rule(match_args, rule_args)
 | 
						|
        theme.autostart_app_rules[proc.pid] = rule_id
 | 
						|
        return proc
 | 
						|
    except FileNotFoundError as e:
 | 
						|
        logger.error(f"Could not execute {command}, FileNotFoundError - {e}")
 | 
						|
 | 
						|
 | 
						|
def start_in_group_once(theme, qtile, group: str, command: List[str], floating: bool = False,
 | 
						|
                        intrusive: bool = False, dont_break: bool = False):
 | 
						|
    logger.info(f"Attempting to start_in_group_once: {command}")
 | 
						|
    if not is_running(command):
 | 
						|
        return start_in_group(theme=theme, qtile=qtile, group=group, command=command,
 | 
						|
                              floating=floating, intrusive=intrusive, dont_break=dont_break)
 | 
						|
    logger.info(f"Process was already running: {command}")
 | 
						|
 | 
						|
 | 
						|
def call_process(command, **kwargs):
 | 
						|
    """
 | 
						|
    Run the given command and return the string from stdout.
 | 
						|
    """
 | 
						|
    return subprocess.check_output(command, **kwargs).decode()
 | 
						|
 | 
						|
 | 
						|
def get_screen_count():
 | 
						|
    try:
 | 
						|
        if qtile.core.name == "x11":
 | 
						|
            logger.info("Using xrandr to detect screen count")
 | 
						|
            output = subprocess.check_output("xrandr -q".split()).decode('utf-8')
 | 
						|
            output = [x for x in output.split("\n") if " connected" in x]
 | 
						|
            return max(1, len(output))
 | 
						|
        else:
 | 
						|
            return max(1, len(qtile.core.get_screen_info()))
 | 
						|
    except subprocess.CalledProcessError:
 | 
						|
        pass
 | 
						|
    return 1
 | 
						|
 | 
						|
 | 
						|
def bar_separator(config):
 | 
						|
    return widget.Sep(foreground=config.get('colour_spacer_background', '#777777'),
 | 
						|
                      linewidth=config.get('width_spacer', 1),
 | 
						|
                      padding=config.get('padding_spacer', 4),
 | 
						|
                      )
 | 
						|
 | 
						|
def init_notify(qtile):
 | 
						|
    #if qtile and qtile.theme_instance and qtile.theme_instance.startup_completed:
 | 
						|
    try:
 | 
						|
        if not notify2.is_initted():
 | 
						|
            logger.warning("Initializing Notify2")
 | 
						|
            notify2.init("QTileWM")
 | 
						|
    except DBusException:
 | 
						|
        logger.error(f"Failed to initialize Notify2 (DBus error), retrying later.")
 | 
						|
    except Exception:
 | 
						|
        logger.error(f"Failed to initialize Notify2 (Generic error), retrying later.")
 | 
						|
    #else:
 | 
						|
    #    logger.warning(f"Not initializing Notify2 yet, QTile startup not completed.")
 | 
						|
 | 
						|
 | 
						|
def notify(qtile, title, content, urgency=URGENCY_NORMAL, timeout=5000, image=None):
 | 
						|
    if image is not None:
 | 
						|
        notification = Notification(
 | 
						|
            summary=title, message=content,
 | 
						|
            icon=image
 | 
						|
        )
 | 
						|
    else:
 | 
						|
        notification = Notification(
 | 
						|
            summary=title, message=content
 | 
						|
        )
 | 
						|
    notification.set_timeout(timeout)
 | 
						|
    notification.set_urgency(urgency)
 | 
						|
 | 
						|
    init_notify(qtile)
 | 
						|
 | 
						|
    try:
 | 
						|
        try:
 | 
						|
            return notification.show()
 | 
						|
        except notify2.UninittedError:
 | 
						|
            logger.warning("Notify2 is not initialized")
 | 
						|
    except Exception as e:
 | 
						|
        logger.warning("Showing notification failed: {}".format(e))
 | 
						|
        logger.warning(traceback.format_exc())
 | 
						|
 | 
						|
 | 
						|
def spawn_popup(qtile, x, y, text):
 | 
						|
    """
 | 
						|
    :param qtile: The main qtile instance
 | 
						|
    :type qtile: Qtile
 | 
						|
    :param x: x-coordinate
 | 
						|
    :type x: int
 | 
						|
    :param y: y-coordinate
 | 
						|
    :type y: int
 | 
						|
    :param text: String to display
 | 
						|
    :type text: str
 | 
						|
    :return: The popup instance
 | 
						|
    :rtype: Internal
 | 
						|
    """
 | 
						|
    if qtile.core.name == "x11":
 | 
						|
        from libqtile.backend.x11.window import Internal
 | 
						|
    else:
 | 
						|
        from libqtile.backend.wayland.window import Internal
 | 
						|
    popup = Internal.create(
 | 
						|
        qtile, x, y, 100, 100, opacity=1
 | 
						|
    )
 | 
						|
 | 
						|
    # Create textwidget for in window
 | 
						|
    popup.bordercolor = "#000000"
 | 
						|
    popup.borderwidth = 1
 | 
						|
 | 
						|
    popup.focus(False)
 | 
						|
 | 
						|
    #popup.
 | 
						|
 | 
						|
    return popup
 | 
						|
 | 
						|
 | 
						|
def despawn_popup(popup):
 | 
						|
    """
 | 
						|
    :type popup: Internal
 | 
						|
    :param popup: The popup to despawn
 | 
						|
    """
 | 
						|
    popup.kill()
 | 
						|
 | 
						|
 | 
						|
def test_popups(qtile):
 | 
						|
    popup = spawn_popup(qtile, 10, 10, "Hello World!")
 | 
						|
    sleep(3)
 | 
						|
    despawn_popup(popup)
 | 
						|
 | 
						|
 | 
						|
def display_wm_class(qtile):
 | 
						|
    window = qtile.current_window if qtile else None
 | 
						|
 | 
						|
    if window:
 | 
						|
        wm_class = window.get_wm_class() or None
 | 
						|
        name = window.name
 | 
						|
 | 
						|
        if wm_class:
 | 
						|
            notify(qtile=qtile, title="WM_Class of {}".format(name),
 | 
						|
                   content="{}".format(wm_class),
 | 
						|
                   urgency=notify2.URGENCY_CRITICAL)
 | 
						|
 | 
						|
 | 
						|
def bluetooth_audio_sink():
 | 
						|
    try:
 | 
						|
        output = subprocess.check_output("pamixer --list-sinks".split()).decode("utf-8")
 | 
						|
        output = [x for x in output.split('\n') if "blue" in x.lower()]
 | 
						|
    except (subprocess.CalledProcessError, FileNotFoundError):
 | 
						|
        return -1
 | 
						|
 | 
						|
    sink = -1
 | 
						|
    try:
 | 
						|
        sink = int(output[0].split()[0])
 | 
						|
    except IndexError:
 | 
						|
        pass
 | 
						|
    except AttributeError:
 | 
						|
        pass
 | 
						|
    except ValueError:
 | 
						|
        pass
 | 
						|
 | 
						|
    return sink
 | 
						|
 | 
						|
 | 
						|
def bluetooth_audio_connected():
 | 
						|
    return bluetooth_audio_sink() != -1
 |