make a repo, commit library

This commit is contained in:
GiggioG 2025-09-07 02:27:18 +03:00
commit ca42b7bf20
6 changed files with 476 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
./investigate/data
./investigate/decodedImg
./investigate/testImages
./investigate/dataBin
./investigate/dataImg

167
src/commands.py Normal file
View File

@ -0,0 +1,167 @@
from PIL import Image;
def checkImage(imgPath):
img = Image.open(imgPath).convert("RGBA")
if img.size[0] != 384:
return False # raise ValueError("Width must be 384 pixels")
for y in range(img.size[1]):
for x in range(img.size[0]):
r, g, b, a = img.getpixel((x, y))
if a != 255 or (r != 0 and r != 255) or (g != 0 and g != 255) or (b != 0 and b != 255):
return False # raise TypeError("Image must be fully opaque and black and white")
return True
def calcCrc8(data):
crcTable = [0x00, 0x07, 0x0e, 0x09, 0x1c, 0x1b, 0x12, 0x15, 0x38, 0x3f, 0x36, 0x31, 0x24, 0x23, 0x2a, 0x2d, 0x70, 0x77, 0x7e, 0x79, 0x6c, 0x6b, 0x62, 0x65, 0x48, 0x4f, 0x46, 0x41, 0x54, 0x53, 0x5a, 0x5d, 0xe0, 0xe7, 0xee, 0xe9, 0xfc, 0xfb, 0xf2, 0xf5, 0xd8, 0xdf, 0xd6, 0xd1, 0xc4, 0xc3, 0xca, 0xcd, 0x90, 0x97, 0x9e, 0x99, 0x8c, 0x8b, 0x82, 0x85, 0xa8, 0xaf, 0xa6, 0xa1, 0xb4, 0xb3, 0xba, 0xbd, 0xc7, 0xc0, 0xc9, 0xce, 0xdb, 0xdc, 0xd5, 0xd2, 0xff, 0xf8, 0xf1, 0xf6, 0xe3, 0xe4, 0xed, 0xea, 0xb7, 0xb0, 0xb9, 0xbe, 0xab, 0xac, 0xa5, 0xa2, 0x8f, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9d, 0x9a, 0x27, 0x20, 0x29, 0x2e, 0x3b, 0x3c, 0x35, 0x32, 0x1f, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0d, 0x0a, 0x57, 0x50, 0x59, 0x5e, 0x4b, 0x4c, 0x45, 0x42, 0x6f, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7d, 0x7a, 0x89, 0x8e, 0x87, 0x80, 0x95, 0x92, 0x9b, 0x9c, 0xb1, 0xb6, 0xbf, 0xb8, 0xad, 0xaa, 0xa3, 0xa4, 0xf9, 0xfe, 0xf7, 0xf0, 0xe5, 0xe2, 0xeb, 0xec, 0xc1, 0xc6, 0xcf, 0xc8, 0xdd, 0xda, 0xd3, 0xd4, 0x69, 0x6e, 0x67, 0x60, 0x75, 0x72, 0x7b, 0x7c, 0x51, 0x56, 0x5f, 0x58, 0x4d, 0x4a, 0x43, 0x44, 0x19, 0x1e, 0x17, 0x10, 0x05, 0x02, 0x0b, 0x0c, 0x21, 0x26, 0x2f, 0x28, 0x3d, 0x3a, 0x33, 0x34, 0x4e, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5c, 0x5b, 0x76, 0x71, 0x78, 0x7f, 0x6a, 0x6d, 0x64, 0x63, 0x3e, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2c, 0x2b, 0x06, 0x01, 0x08, 0x0f, 0x1a, 0x1d, 0x14, 0x13, 0xae, 0xa9, 0xa0, 0xa7, 0xb2, 0xb5, 0xbc, 0xbb, 0x96, 0x91, 0x98, 0x9f, 0x8a, 0x8d, 0x84, 0x83, 0xde, 0xd9, 0xd0, 0xd7, 0xc2, 0xc5, 0xcc, 0xcb, 0xe6, 0xe1, 0xe8, 0xef, 0xfa, 0xfd, 0xf4, 0xf3]
byte = 0
for b in data:
byte = (byte ^ b) & 0xff
byte = crcTable[byte]
return byte
def commandBytes(cmd, data):
dataLen = len(data)
dataLenLow = dataLen & 0xff
dataLenHigh = dataLen & 0xff00
return bytes([0x51, 0x78, cmd, 0x00, dataLenLow, dataLenHigh] + data + [calcCrc8(data), 0xFF])
def getCommandCode(cmdName):
if cmdName == "BLACKENING" or cmdName == "QUALITY":
return 0xA4
elif cmdName == "ENERGY" or cmdName == "ENERAGY":
return 0xAF
elif cmdName == "PRINT_TYPE":
return 0xBE
elif cmdName == "FEED_PAPER":
return 0xBD
elif cmdName == "DRAW_COMPRESSED":
return 0xBF
elif cmdName == "DRAW_PACKED":
return 0xA2
elif cmdName == "PAPER":
return 0xA1
else:
raise ValueError("Unsupported operation")
def runLength(line):
encoded = []
def encodeRun(curr, run):
while run > 127:
encoded.append((curr << 7) | 127)
run -= 127
if run > 0:
encoded.append((curr << 7) | run)
run = 0
curr = None
for p in line:
if p == curr:
run += 1
else:
encodeRun(curr, run)
curr = p
run = 1
encodeRun(curr, run)
return encoded
def bitPack(line):
encoded = []
for p in range(0, len(line), 8):
byte = 0
for i in range(8):
byte <<= 1
byte |= line[p+7-i]
encoded.append(byte)
return encoded
def toArrayLE(num): # to low-endian array
return [num & 0xff, (num >> 8) & 0xff]
def getEnergy(printDepth):
if printDepth <= 0 or printDepth > 7:
raise ValueError("Invalid print depth")
return int(7500 + (printDepth - 4)*0.15*7500)
def getTypeByte(typeStr):
if typeStr == "IMAGE":
return 0x00
elif typeStr == "TEXT":
return 0x01
elif typeStr == "LABEL":
return 0x03
else:
raise ValueError("No such print type")
def getSpeed(typeStr):
if typeStr == "IMAGE":
return 30
elif typeStr == "TEXT":
return 10
else:
raise ValueError("No such print speed type")
def readImage(imgPath):
if not checkImage(imgPath):
raise ValueError("Image is not of width 384 and 1bpp")
img = Image.open(imgPath).convert("1")
return img
def compressImageToCmds(img):
commands = []
for y in range(img.size[1]):
line = [img.getpixel((i, y))==False for i in range(img.size[0])]
runs = runLength(line)
bits = bitPack(line)
cmd = []
if len(runs) < len(bits):
cmd = commandBytes(getCommandCode("DRAW_COMPRESSED"), runs)
else:
cmd = commandBytes(getCommandCode("DRAW_PACKED"), bits)
commands.append(cmd)
return commands
def saveCommandsToFile(commands, saveToFile=None, saveToFileHuman=None):
if saveToFile:
allCmds = b"".join(commands)
with open(saveToFile, "wb") as f:
f.write(allCmds)
if saveToFileHuman:
humanCmds = [" ".join(hex(b)[2:4].zfill(2).upper() for b in c) for c in commands]
with open(saveToFileHuman, "w") as f:
f.write("\n".join(humanCmds))
def genMoveUp(lines=0x60):
cmds = []
cmds.append(commandBytes(getCommandCode("FEED_PAPER"), [lines]))
# The app for some reason splits it up as two commands with 0x30, thus so do I
while lines > 0x30:
lines -= 0x30
cmds.append(commandBytes(getCommandCode("PAPER"), [0x30, 0x00]))
if lines > 0:
cmds.append(commandBytes(getCommandCode("PAPER"), [lines, 0x00]))
cmds.append(commandBytes(getCommandCode("FEED_PAPER"), [lines]))
return cmds
def genCommands(img, type="IMAGE", energy=None, speed=None, moveUpAfter=True, saveToFile=None, saveToFileHuman=None):
if energy == None:
energy = getEnergy(4) if type != "TEXT" else 0
if speed == None:
speed = getSpeed(type if type != "LABEL" else "IMAGE")
typeByte = getTypeByte(type)
commands = []
commands.append(commandBytes(getCommandCode("BLACKENING"), [0x33]))
if energy > 0:
commands.append(commandBytes(getCommandCode("ENERGY"), toArrayLE(energy)))
commands.append(commandBytes(getCommandCode("PRINT_TYPE"), [typeByte]))
commands.append(commandBytes(getCommandCode("FEED_PAPER"), [speed]))
commands.extend(compressImageToCmds(img))
if moveUpAfter:
commands.extend(genMoveUp())
saveCommandsToFile(commands, saveToFile, saveToFileHuman)
return commands

118
src/connection.py Normal file
View File

@ -0,0 +1,118 @@
from bleak import BleakClient;
import asyncio
import sys
class Connection:
_RX_CHAR = "0000AE02-0000-1000-8000-00805F9B34FB"
_TX_CHAR = "0000AE01-0000-1000-8000-00805F9B34FB"
_GET_DEV_STATE = b"\x51\x78\xA3\x00\x01\x00\x00\x00\xFF"
def _charNotify(self, char, notify):
if char.uuid == Connection._RX_CHAR.lower() and notify[0] == 0x51 and notify[1] == 0x78 and notify[2] == 0xA3:
statusByte = bin(notify[6])
status = None
if notify[6] == 0x00:
status = "OKAY"
elif statusByte.endswith("1"):
status = "OUT_OF_PAPER"
elif statusByte.endswith("10"):
status = "COMPARTMENT_OPEN"
elif statusByte.endswith("100"):
status = "OVERHEATED"
elif statusByte.endswith("1000"):
status = "LOW_BATTERY"
elif statusByte.endswith("10000"):
status = "CHARGING"
elif statusByte.endswith("10000000"):
status = "PRINTING"
print(f"Device status: {status}")
assert status in ["OKAY", "LOW_BATTERY", "CHAR"]
self.notifyStatusEvent.set() # Signal that notification was received
async def _waitForReady(self):
if not self.client or not self.client.is_connected:
raise RuntimeError("Client not connected")
print("Waiting for device status")
await self.client.write_gatt_char(Connection._TX_CHAR, Connection._GET_DEV_STATE)
await self.notifyStatusEvent.wait()
self.notifyStatusEvent.clear()
async def connect(self):
if self.client and self.client.is_connected:
raise RuntimeError("Client already connected")
self.client = BleakClient(self.macAddress)
await self.client.connect()
self.client.connection = self
await self.client.start_notify(Connection._RX_CHAR, self._charNotify)
await self._waitForReady()
async def disconnect(self):
if not self.client or not self.client.is_connected:
raise RuntimeError("Client not connected")
await self.client.disconnect()
def __init__(self, macAddress):
self.macAddress = macAddress
self.notifyStatusEvent = asyncio.Event()
self.client = None
async def __aenter__(self):
if not self.client or not self.client.is_connected:
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.disconnect()
async def send(self, commands, delay=0):
if not self.client or not self.client.is_connected:
raise RuntimeError("Client not connected")
print("Sending commands...")
for cmd in commands:
await self.client.write_gatt_char(Connection._TX_CHAR, cmd)
print("Sending " + hex(cmd[2]))
if delay > 0:
await asyncio.sleep(delay)
print("Finished sending commands")
await self._waitForReady()
class FakeConnection:
async def connect(self):
if self.file and not self.file.is_open.closed:
raise RuntimeError("File already opened")
self.file = open(self.logFilePath, "ab")
if self.humanLogFilePath:
self.humanFile = open(self.humanLogFilePath, "a")
async def disconnect(self):
if not self.file or self.file.closed:
raise RuntimeError("File already closed")
self.file.close()
if self.humanFile:
self.humanFile.close()
def __init__(self, logFilePath, humanLogFilePath=None):
self.logFilePath = logFilePath
self.humanLogFilePath = humanLogFilePath
self.file = None
self.humanFile = None
async def __aenter__(self):
if not self.file or self.client.closed:
await self.connect()
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.disconnect()
async def send(self, commands, delay=0):
if not self.file or self.file.closed:
raise RuntimeError("File not opened")
print("Sending commands...")
for cmd in commands:
self.file.write(cmd)
if self.humanFile:
self.humanFile.write(" ".join([hex(c)[2:].zfill(2).upper() for c in cmd]) + "\n")
print("Finished sending commands")

93
src/decode.py Normal file
View File

@ -0,0 +1,93 @@
import os
from PIL import Image
import sys
import glob
def readHumanFile(fileName):
with open(fileName, "r") as file:
data = file.read().split("\n")
data = [d.strip() for d in data if len(d) > 0]
cmdBytes = []
for d in data:
cmdBytes.extend(bytes.fromhex(d))
return cmdBytes
def readFile(fileName):
with open(fileName, "rb") as file:
cmdBytes = [file.read()]
return cmdBytes
def decodeCommands(cmdBytes, saveFilename=None):
cmds = []
idx = -1
data = []
cmdCode = 0x00
dataLen = 0
crc = 0x00
for i, b in enumerate(cmdBytes):
idx += 1
#print(i, idx, hex(b))
if b == 0x51 and idx == 0:
pass
elif b == 0x78 and idx == 1:
pass
elif idx == 2:
cmdCode = b
elif b == 0x00 and idx == 3:
pass
elif idx == 4:
dataLen = b
elif idx == 5:
if b != 0:
dataLen = dataLen*0x100 + b
elif idx >= 6 and idx < 6+dataLen:
data.append(b)
elif idx == 6+dataLen:
crc = b
elif idx == 6+dataLen+1 and b == 0xFF:
o = {
"cmd": cmdCode,
"data": data,
"crc": crc
}
cmds.append(o)
idx = -1
crc = 0
data = []
cmdCode = 0
else:
idx = -1
crc = 0
data = []
cmdCode = 0
#print(cmds)
#for c in cmds:
# print(hex(c["cmd"]), hex(len(c["data"])), hex(c["crc"]))
rows = []
for c in cmds:
if c["cmd"] == 0xa2:
s = "".join([bin(b)[2:10].zfill(8)[::-1] for b in c["data"]])
row = [int(x)==1 for x in s]
rows.append(row)
elif c["cmd"] == 0xbf:
row = []
for b in c["data"]:
bit = (b & (1<<7)) == 128
times = b & ~(1<<7)
row.extend([bit] * times)
rows.append(row)
HEIGHT = len(rows)
WIDTH = max(len(r) for r in rows)
img = Image.new("1", (WIDTH, HEIGHT))
for y in range(HEIGHT):
for x in range(WIDTH):
img.putpixel((x,y), not rows[y][x])
if saveFilename != None:
img.save(saveFilename)
return img

34
src/main.py Normal file
View File

@ -0,0 +1,34 @@
import asyncio
from import datetime import datetime
from commands import *
from connection import Connection, FakeConnection;
from textToImage import textToImage;
def getDate():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async def beginLog(conn):
img = textToImage(f"BEGIN LOG @ [{getDate()}]", fontSize=16, align="center", bold=True)
cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=False)
await conn.send(cmds)
async def printToLog(conn, text):
img = textToImage(f"[{getDate()}] {text}")
cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=False)
await conn.send(cmds)
async def finishLog(conn):
img = textToImage(f"END LOG @ [{getDate()}]", fontSize=16, align="center", bold=True)
cmds = genCommands(img, energy=getEnergy(7), moveUpAfter=True)
await conn.send(cmds)
async def main():
async with Connection("AA:BB:CC:DD:EE:FF") as conn:
await beginLog(conn)
for i in range(3):
await printToLog(conn, f"Message in log #{i+1}")
await finishLog(conn)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

59
src/textToImage.py Normal file
View File

@ -0,0 +1,59 @@
from PIL import Image, ImageDraw, ImageFont
import re
def textToImage(text, fontSize=12, fontFile="DejaVuSans.ttf", align="left", lineSpacing=2, bold=False, whiteOnBlack=False, saveToFile=None):
font = ImageFont.truetype(fontFile, fontSize)
WIDTH = 384
FILL = 0 if not whiteOnBlack else 1
BACKGROUND = 1 if not whiteOnBlack else 0
linesRaw = text.split('\n')
# wrap lines
lines = []
for line in linesRaw:
while font.getlength(line, mode="1") > WIDTH: # binary search
spaces = [0] + [m.start() for m in re.finditer(' ', line)]
left = 0
right = len(spaces)-1
while left < right-1:
mid = (left + right) // 2
if font.getlength(line[:spaces[mid]], mode="1") > WIDTH:
right = mid-1
else:
left = mid
splitLine = line[:spaces[left]]
bbox = font.getbbox(splitLine)
lines.append((splitLine, bbox))
line = line[spaces[left]+1:]
if len(line) > 0:
lines.append((line, font.getbbox(line)))
# draw image
x = None
anchor = None
match align:
case "left":
x = 0
anchor = "la"
case "center":
x = WIDTH//2
anchor = "ma"
case "right":
x = WIDTH
anchor = "ra"
case _:
raise ValueError("Invalid align value")
img_height = sum(bbox[3]-bbox[1] for (_line, bbox) in lines) + (len(lines)-1)*lineSpacing
img = Image.new('1', (WIDTH, img_height), BACKGROUND)
draw = ImageDraw.Draw(img)
y = 0
for (line, bbox) in lines:
draw.text((x, y-bbox[1]), line, font=font, fill=FILL, align=align, anchor=anchor, stroke_width=bold)
y += bbox[3]-bbox[1] + lineSpacing
if saveToFile:
img.save(saveToFile)
return img