From ca42b7bf2067a1a5033559fde6c60aee40a24ef5 Mon Sep 17 00:00:00 2001 From: GiggioG Date: Sun, 7 Sep 2025 02:27:18 +0300 Subject: [PATCH] make a repo, commit library --- .gitignore | 5 ++ src/commands.py | 167 +++++++++++++++++++++++++++++++++++++++++++++ src/connection.py | 118 ++++++++++++++++++++++++++++++++ src/decode.py | 93 +++++++++++++++++++++++++ src/main.py | 34 +++++++++ src/textToImage.py | 59 ++++++++++++++++ 6 files changed, 476 insertions(+) create mode 100644 .gitignore create mode 100644 src/commands.py create mode 100644 src/connection.py create mode 100644 src/decode.py create mode 100644 src/main.py create mode 100644 src/textToImage.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5949ad7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +./investigate/data +./investigate/decodedImg +./investigate/testImages +./investigate/dataBin +./investigate/dataImg \ No newline at end of file diff --git a/src/commands.py b/src/commands.py new file mode 100644 index 0000000..423a6fe --- /dev/null +++ b/src/commands.py @@ -0,0 +1,167 @@ +from PIL import Image; + +def checkImage(imgPath): + img = Image.open(imgPath).convert("RGBA") + if img.size[0] != 384: + return False # raise ValueError("Width must be 384 pixels") + for y in range(img.size[1]): + for x in range(img.size[0]): + r, g, b, a = img.getpixel((x, y)) + if a != 255 or (r != 0 and r != 255) or (g != 0 and g != 255) or (b != 0 and b != 255): + return False # raise TypeError("Image must be fully opaque and black and white") + return True + +def calcCrc8(data): + crcTable = [0x00, 0x07, 0x0e, 0x09, 0x1c, 0x1b, 0x12, 0x15, 0x38, 0x3f, 0x36, 0x31, 0x24, 0x23, 0x2a, 0x2d, 0x70, 0x77, 0x7e, 0x79, 0x6c, 0x6b, 0x62, 0x65, 0x48, 0x4f, 0x46, 0x41, 0x54, 0x53, 0x5a, 0x5d, 0xe0, 0xe7, 0xee, 0xe9, 0xfc, 0xfb, 0xf2, 0xf5, 0xd8, 0xdf, 0xd6, 0xd1, 0xc4, 0xc3, 0xca, 0xcd, 0x90, 0x97, 0x9e, 0x99, 0x8c, 0x8b, 0x82, 0x85, 0xa8, 0xaf, 0xa6, 0xa1, 0xb4, 0xb3, 0xba, 0xbd, 0xc7, 0xc0, 0xc9, 0xce, 0xdb, 0xdc, 0xd5, 0xd2, 0xff, 0xf8, 0xf1, 0xf6, 0xe3, 0xe4, 0xed, 0xea, 0xb7, 0xb0, 0xb9, 0xbe, 0xab, 0xac, 0xa5, 0xa2, 0x8f, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9d, 0x9a, 0x27, 0x20, 0x29, 0x2e, 0x3b, 0x3c, 0x35, 0x32, 0x1f, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0d, 0x0a, 0x57, 0x50, 0x59, 0x5e, 0x4b, 0x4c, 0x45, 0x42, 0x6f, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7d, 0x7a, 0x89, 0x8e, 0x87, 0x80, 0x95, 0x92, 0x9b, 0x9c, 0xb1, 0xb6, 0xbf, 0xb8, 0xad, 0xaa, 0xa3, 0xa4, 0xf9, 0xfe, 0xf7, 0xf0, 0xe5, 0xe2, 0xeb, 0xec, 0xc1, 0xc6, 0xcf, 0xc8, 0xdd, 0xda, 0xd3, 0xd4, 0x69, 0x6e, 0x67, 0x60, 0x75, 0x72, 0x7b, 0x7c, 0x51, 0x56, 0x5f, 0x58, 0x4d, 0x4a, 0x43, 0x44, 0x19, 0x1e, 0x17, 0x10, 0x05, 0x02, 0x0b, 0x0c, 0x21, 0x26, 0x2f, 0x28, 0x3d, 0x3a, 0x33, 0x34, 0x4e, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5c, 0x5b, 0x76, 0x71, 0x78, 0x7f, 0x6a, 0x6d, 0x64, 0x63, 0x3e, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2c, 0x2b, 0x06, 0x01, 0x08, 0x0f, 0x1a, 0x1d, 0x14, 0x13, 0xae, 0xa9, 0xa0, 0xa7, 0xb2, 0xb5, 0xbc, 0xbb, 0x96, 0x91, 0x98, 0x9f, 0x8a, 0x8d, 0x84, 0x83, 0xde, 0xd9, 0xd0, 0xd7, 0xc2, 0xc5, 0xcc, 0xcb, 0xe6, 0xe1, 0xe8, 0xef, 0xfa, 0xfd, 0xf4, 0xf3] + byte = 0 + for b in data: + byte = (byte ^ b) & 0xff + byte = crcTable[byte] + return byte + +def commandBytes(cmd, data): + dataLen = len(data) + dataLenLow = dataLen & 0xff + dataLenHigh = dataLen & 0xff00 + return bytes([0x51, 0x78, cmd, 0x00, dataLenLow, dataLenHigh] + data + [calcCrc8(data), 0xFF]) + +def getCommandCode(cmdName): + if cmdName == "BLACKENING" or cmdName == "QUALITY": + return 0xA4 + elif cmdName == "ENERGY" or cmdName == "ENERAGY": + return 0xAF + elif cmdName == "PRINT_TYPE": + return 0xBE + elif cmdName == "FEED_PAPER": + return 0xBD + elif cmdName == "DRAW_COMPRESSED": + return 0xBF + elif cmdName == "DRAW_PACKED": + return 0xA2 + elif cmdName == "PAPER": + return 0xA1 + else: + raise ValueError("Unsupported operation") + +def runLength(line): + encoded = [] + def encodeRun(curr, run): + while run > 127: + encoded.append((curr << 7) | 127) + run -= 127 + if run > 0: + encoded.append((curr << 7) | run) + run = 0 + curr = None + for p in line: + if p == curr: + run += 1 + else: + encodeRun(curr, run) + curr = p + run = 1 + encodeRun(curr, run) + + return encoded + +def bitPack(line): + encoded = [] + for p in range(0, len(line), 8): + byte = 0 + for i in range(8): + byte <<= 1 + byte |= line[p+7-i] + encoded.append(byte) + return encoded + +def toArrayLE(num): # to low-endian array + return [num & 0xff, (num >> 8) & 0xff] + +def getEnergy(printDepth): + if printDepth <= 0 or printDepth > 7: + raise ValueError("Invalid print depth") + return int(7500 + (printDepth - 4)*0.15*7500) + +def getTypeByte(typeStr): + if typeStr == "IMAGE": + return 0x00 + elif typeStr == "TEXT": + return 0x01 + elif typeStr == "LABEL": + return 0x03 + else: + raise ValueError("No such print type") + +def getSpeed(typeStr): + if typeStr == "IMAGE": + return 30 + elif typeStr == "TEXT": + return 10 + else: + raise ValueError("No such print speed type") + +def readImage(imgPath): + if not checkImage(imgPath): + raise ValueError("Image is not of width 384 and 1bpp") + img = Image.open(imgPath).convert("1") + return img + +def compressImageToCmds(img): + commands = [] + for y in range(img.size[1]): + line = [img.getpixel((i, y))==False for i in range(img.size[0])] + runs = runLength(line) + bits = bitPack(line) + cmd = [] + if len(runs) < len(bits): + cmd = commandBytes(getCommandCode("DRAW_COMPRESSED"), runs) + else: + cmd = commandBytes(getCommandCode("DRAW_PACKED"), bits) + commands.append(cmd) + return commands + +def saveCommandsToFile(commands, saveToFile=None, saveToFileHuman=None): + if saveToFile: + allCmds = b"".join(commands) + with open(saveToFile, "wb") as f: + f.write(allCmds) + if saveToFileHuman: + humanCmds = [" ".join(hex(b)[2:4].zfill(2).upper() for b in c) for c in commands] + with open(saveToFileHuman, "w") as f: + f.write("\n".join(humanCmds)) + +def genMoveUp(lines=0x60): + cmds = [] + cmds.append(commandBytes(getCommandCode("FEED_PAPER"), [lines])) + # The app for some reason splits it up as two commands with 0x30, thus so do I + while lines > 0x30: + lines -= 0x30 + cmds.append(commandBytes(getCommandCode("PAPER"), [0x30, 0x00])) + if lines > 0: + cmds.append(commandBytes(getCommandCode("PAPER"), [lines, 0x00])) + cmds.append(commandBytes(getCommandCode("FEED_PAPER"), [lines])) + return cmds + +def genCommands(img, type="IMAGE", energy=None, speed=None, moveUpAfter=True, saveToFile=None, saveToFileHuman=None): + if energy == None: + energy = getEnergy(4) if type != "TEXT" else 0 + if speed == None: + speed = getSpeed(type if type != "LABEL" else "IMAGE") + + typeByte = getTypeByte(type) + + commands = [] + commands.append(commandBytes(getCommandCode("BLACKENING"), [0x33])) + if energy > 0: + commands.append(commandBytes(getCommandCode("ENERGY"), toArrayLE(energy))) + commands.append(commandBytes(getCommandCode("PRINT_TYPE"), [typeByte])) + commands.append(commandBytes(getCommandCode("FEED_PAPER"), [speed])) + + commands.extend(compressImageToCmds(img)) + + if moveUpAfter: + commands.extend(genMoveUp()) + + saveCommandsToFile(commands, saveToFile, saveToFileHuman) + + return commands \ No newline at end of file diff --git a/src/connection.py b/src/connection.py new file mode 100644 index 0000000..8c479e6 --- /dev/null +++ b/src/connection.py @@ -0,0 +1,118 @@ +from bleak import BleakClient; +import asyncio +import sys + +class Connection: + _RX_CHAR = "0000AE02-0000-1000-8000-00805F9B34FB" + _TX_CHAR = "0000AE01-0000-1000-8000-00805F9B34FB" + _GET_DEV_STATE = b"\x51\x78\xA3\x00\x01\x00\x00\x00\xFF" + + def _charNotify(self, char, notify): + if char.uuid == Connection._RX_CHAR.lower() and notify[0] == 0x51 and notify[1] == 0x78 and notify[2] == 0xA3: + statusByte = bin(notify[6]) + status = None + if notify[6] == 0x00: + status = "OKAY" + elif statusByte.endswith("1"): + status = "OUT_OF_PAPER" + elif statusByte.endswith("10"): + status = "COMPARTMENT_OPEN" + elif statusByte.endswith("100"): + status = "OVERHEATED" + elif statusByte.endswith("1000"): + status = "LOW_BATTERY" + elif statusByte.endswith("10000"): + status = "CHARGING" + elif statusByte.endswith("10000000"): + status = "PRINTING" + + print(f"Device status: {status}") + + assert status in ["OKAY", "LOW_BATTERY", "CHAR"] + self.notifyStatusEvent.set() # Signal that notification was received + + async def _waitForReady(self): + if not self.client or not self.client.is_connected: + raise RuntimeError("Client not connected") + print("Waiting for device status") + await self.client.write_gatt_char(Connection._TX_CHAR, Connection._GET_DEV_STATE) + await self.notifyStatusEvent.wait() + self.notifyStatusEvent.clear() + + async def connect(self): + if self.client and self.client.is_connected: + raise RuntimeError("Client already connected") + self.client = BleakClient(self.macAddress) + await self.client.connect() + self.client.connection = self + await self.client.start_notify(Connection._RX_CHAR, self._charNotify) + await self._waitForReady() + + async def disconnect(self): + if not self.client or not self.client.is_connected: + raise RuntimeError("Client not connected") + await self.client.disconnect() + + def __init__(self, macAddress): + self.macAddress = macAddress + self.notifyStatusEvent = asyncio.Event() + self.client = None + + async def __aenter__(self): + if not self.client or not self.client.is_connected: + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self.disconnect() + + async def send(self, commands, delay=0): + if not self.client or not self.client.is_connected: + raise RuntimeError("Client not connected") + print("Sending commands...") + for cmd in commands: + await self.client.write_gatt_char(Connection._TX_CHAR, cmd) + print("Sending " + hex(cmd[2])) + if delay > 0: + await asyncio.sleep(delay) + print("Finished sending commands") + await self._waitForReady() + +class FakeConnection: + async def connect(self): + if self.file and not self.file.is_open.closed: + raise RuntimeError("File already opened") + self.file = open(self.logFilePath, "ab") + if self.humanLogFilePath: + self.humanFile = open(self.humanLogFilePath, "a") + + async def disconnect(self): + if not self.file or self.file.closed: + raise RuntimeError("File already closed") + self.file.close() + if self.humanFile: + self.humanFile.close() + + def __init__(self, logFilePath, humanLogFilePath=None): + self.logFilePath = logFilePath + self.humanLogFilePath = humanLogFilePath + self.file = None + self.humanFile = None + + async def __aenter__(self): + if not self.file or self.client.closed: + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self.disconnect() + + async def send(self, commands, delay=0): + if not self.file or self.file.closed: + raise RuntimeError("File not opened") + print("Sending commands...") + for cmd in commands: + self.file.write(cmd) + if self.humanFile: + self.humanFile.write(" ".join([hex(c)[2:].zfill(2).upper() for c in cmd]) + "\n") + print("Finished sending commands") diff --git a/src/decode.py b/src/decode.py new file mode 100644 index 0000000..be0ada8 --- /dev/null +++ b/src/decode.py @@ -0,0 +1,93 @@ +import os +from PIL import Image +import sys +import glob + +def readHumanFile(fileName): + with open(fileName, "r") as file: + data = file.read().split("\n") + data = [d.strip() for d in data if len(d) > 0] + cmdBytes = [] + for d in data: + cmdBytes.extend(bytes.fromhex(d)) + return cmdBytes + +def readFile(fileName): + with open(fileName, "rb") as file: + cmdBytes = [file.read()] + return cmdBytes + +def decodeCommands(cmdBytes, saveFilename=None): + cmds = [] + + idx = -1 + data = [] + cmdCode = 0x00 + dataLen = 0 + crc = 0x00 + for i, b in enumerate(cmdBytes): + idx += 1 + #print(i, idx, hex(b)) + if b == 0x51 and idx == 0: + pass + elif b == 0x78 and idx == 1: + pass + elif idx == 2: + cmdCode = b + elif b == 0x00 and idx == 3: + pass + elif idx == 4: + dataLen = b + elif idx == 5: + if b != 0: + dataLen = dataLen*0x100 + b + elif idx >= 6 and idx < 6+dataLen: + data.append(b) + elif idx == 6+dataLen: + crc = b + elif idx == 6+dataLen+1 and b == 0xFF: + o = { + "cmd": cmdCode, + "data": data, + "crc": crc + } + cmds.append(o) + idx = -1 + crc = 0 + data = [] + cmdCode = 0 + else: + idx = -1 + crc = 0 + data = [] + cmdCode = 0 + #print(cmds) + #for c in cmds: + # print(hex(c["cmd"]), hex(len(c["data"])), hex(c["crc"])) + + rows = [] + for c in cmds: + if c["cmd"] == 0xa2: + s = "".join([bin(b)[2:10].zfill(8)[::-1] for b in c["data"]]) + row = [int(x)==1 for x in s] + rows.append(row) + elif c["cmd"] == 0xbf: + row = [] + for b in c["data"]: + bit = (b & (1<<7)) == 128 + times = b & ~(1<<7) + row.extend([bit] * times) + rows.append(row) + + HEIGHT = len(rows) + WIDTH = max(len(r) for r in rows) + + img = Image.new("1", (WIDTH, HEIGHT)) + for y in range(HEIGHT): + for x in range(WIDTH): + img.putpixel((x,y), not rows[y][x]) + + if saveFilename != None: + img.save(saveFilename) + + return img \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..ac4d214 --- /dev/null +++ b/src/main.py @@ -0,0 +1,34 @@ +import asyncio +from import datetime import datetime + +from commands import * +from connection import Connection, FakeConnection; +from textToImage import textToImage; + +def getDate(): + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + +async def beginLog(conn): + img = textToImage(f"BEGIN LOG @ [{getDate()}]", fontSize=16, align="center", bold=True) + cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=False) + await conn.send(cmds) + +async def printToLog(conn, text): + img = textToImage(f"[{getDate()}] {text}") + cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=False) + await conn.send(cmds) + +async def finishLog(conn): + img = textToImage(f"END LOG @ [{getDate()}]", fontSize=16, align="center", bold=True) + cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=True) + await conn.send(cmds) + +async def main(): + async with Connection("AA:BB:CC:DD:EE:FF") as conn: + await beginLog(conn) + for i in range(3): + await printToLog(conn, f"Message in log #{i+1}") + await finishLog(conn) + +loop = asyncio.get_event_loop() +loop.run_until_complete(main()) \ No newline at end of file diff --git a/src/textToImage.py b/src/textToImage.py new file mode 100644 index 0000000..62a63e0 --- /dev/null +++ b/src/textToImage.py @@ -0,0 +1,59 @@ +from PIL import Image, ImageDraw, ImageFont +import re + +def textToImage(text, fontSize=12, fontFile="DejaVuSans.ttf", align="left", lineSpacing=2, bold=False, whiteOnBlack=False, saveToFile=None): + font = ImageFont.truetype(fontFile, fontSize) + + WIDTH = 384 + FILL = 0 if not whiteOnBlack else 1 + BACKGROUND = 1 if not whiteOnBlack else 0 + + linesRaw = text.split('\n') + # wrap lines + lines = [] + for line in linesRaw: + while font.getlength(line, mode="1") > WIDTH: # binary search + spaces = [0] + [m.start() for m in re.finditer(' ', line)] + left = 0 + right = len(spaces)-1 + while left < right-1: + mid = (left + right) // 2 + if font.getlength(line[:spaces[mid]], mode="1") > WIDTH: + right = mid-1 + else: + left = mid + splitLine = line[:spaces[left]] + bbox = font.getbbox(splitLine) + lines.append((splitLine, bbox)) + line = line[spaces[left]+1:] + if len(line) > 0: + lines.append((line, font.getbbox(line))) + + # draw image + x = None + anchor = None + match align: + case "left": + x = 0 + anchor = "la" + case "center": + x = WIDTH//2 + anchor = "ma" + case "right": + x = WIDTH + anchor = "ra" + case _: + raise ValueError("Invalid align value") + + img_height = sum(bbox[3]-bbox[1] for (_line, bbox) in lines) + (len(lines)-1)*lineSpacing + img = Image.new('1', (WIDTH, img_height), BACKGROUND) + draw = ImageDraw.Draw(img) + y = 0 + for (line, bbox) in lines: + draw.text((x, y-bbox[1]), line, font=font, fill=FILL, align=align, anchor=anchor, stroke_width=bold) + y += bbox[3]-bbox[1] + lineSpacing + + if saveToFile: + img.save(saveToFile) + + return img \ No newline at end of file