INIT!
This commit is contained in:
@@ -0,0 +1,92 @@
|
||||
#sway socket working for working the socket for sending commands
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
|
||||
ipcmag = b"i3-ipc"
|
||||
|
||||
@dataclass
|
||||
class Output:
|
||||
name: str
|
||||
focused: bool = False
|
||||
active: bool = False
|
||||
|
||||
@dataclass
|
||||
class Workspace:
|
||||
num: int
|
||||
output: str
|
||||
visible: bool = False
|
||||
|
||||
class Swayipc:
|
||||
def __init__(self):
|
||||
self.socket = None
|
||||
sock_path = None
|
||||
for var in ("SWAYSOCK", "I3SOCK"):
|
||||
sock_path = os.environ.get(var)
|
||||
if sock_path:
|
||||
break
|
||||
|
||||
if not sock_path:
|
||||
raise RuntimeError("No socket found!")
|
||||
|
||||
self._connect(sock_path)
|
||||
|
||||
def _connect(self, path):
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.socket.connect(path)
|
||||
|
||||
def _send(self, cmd_type: int, payload: str):
|
||||
data = payload.encode()
|
||||
header = (
|
||||
ipcmag
|
||||
+ struct.pack("=I", len(data))
|
||||
+ struct.pack("=I", cmd_type)
|
||||
)
|
||||
self.socket.sendall(header + data)
|
||||
|
||||
def _read(self):
|
||||
header = b""
|
||||
while len(header) < 14:
|
||||
chunk = self.socket.recv(14 - len(header))
|
||||
if not chunk:
|
||||
raise RuntimeError("Socket closed")
|
||||
header += chunk
|
||||
|
||||
if header[:6] != ipcmag:
|
||||
raise RuntimeError("Bad ipc response")
|
||||
|
||||
length = struct.unpack("=I", header[6:10])[0]
|
||||
|
||||
payload = b""
|
||||
while len(payload) < length:
|
||||
chunk = self.socket.recv(length - len(payload))
|
||||
if not chunk:
|
||||
raise RuntimeError("Socket closed during message")
|
||||
payload += chunk
|
||||
|
||||
return payload.decode()
|
||||
|
||||
def cmd(self, command_string):
|
||||
try:
|
||||
self._send(0, command_string)
|
||||
resp = json.loads(self._read())
|
||||
if not resp[0].get("success", False):
|
||||
print(f"Sway ipc error (skipped): '{command_string}' -> {resp[0].get('error')}", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f"Exception execute command '{command_string}': {e}", file=sys.stderr)
|
||||
|
||||
def outputs(self):
|
||||
self._send(3, "")
|
||||
data = json.loads(self._read())
|
||||
return [
|
||||
Output(name=o["name"], focused=o.get("focused", False), active=o.get("active", False))
|
||||
for o in data if o.get("active", False)
|
||||
]
|
||||
|
||||
def workspaces(self):
|
||||
self._send(1, "")
|
||||
data = json.loads(self._read())
|
||||
return [Workspace(num=w["num"], output=w["output"], visible=w.get("visible", False)) for w in data]
|
||||
Reference in New Issue
Block a user