#sway socket working for working the socket for sending commands import json import os import socket import struct import sys from dataclasses import dataclass ipcmag = b"i3-ipc" @dataclass class Output: name: str focused: bool = False active: bool = False @dataclass class Workspace: num: int output: str visible: bool = False class Swayipc: def __init__(self): self.socket = None sock_path = None for var in ("SWAYSOCK", "I3SOCK"): sock_path = os.environ.get(var) if sock_path: break if not sock_path: raise RuntimeError("No socket found!") self._connect(sock_path) def _connect(self, path): self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.socket.connect(path) def _send(self, cmd_type: int, payload: str): data = payload.encode() header = ( ipcmag + struct.pack("=I", len(data)) + struct.pack("=I", cmd_type) ) self.socket.sendall(header + data) def _read(self): header = b"" while len(header) < 14: chunk = self.socket.recv(14 - len(header)) if not chunk: raise RuntimeError("Socket closed") header += chunk if header[:6] != ipcmag: raise RuntimeError("Bad ipc response") length = struct.unpack("=I", header[6:10])[0] payload = b"" while len(payload) < length: chunk = self.socket.recv(length - len(payload)) if not chunk: raise RuntimeError("Socket closed during message") payload += chunk return payload.decode() def cmd(self, command_string): try: self._send(0, command_string) resp = json.loads(self._read()) if not resp[0].get("success", False): print(f"Sway ipc error (skipped): '{command_string}' -> {resp[0].get('error')}", file=sys.stderr) except Exception as e: print(f"Exception execute command '{command_string}': {e}", file=sys.stderr) def outputs(self): self._send(3, "") data = json.loads(self._read()) return [ Output(name=o["name"], focused=o.get("focused", False), active=o.get("active", False)) for o in data if o.get("active", False) ] def workspaces(self): self._send(1, "") data = json.loads(self._read()) return [Workspace(num=w["num"], output=w["output"], visible=w.get("visible", False)) for w in data]