diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..abd32e8 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*__pycache__* \ No newline at end of file diff --git a/README.md b/README.md index 64d1278..c3ef2d0 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,41 @@ -# ppa6-python -### Python module for printing on Peripage A6 and A6+ +# peripage-python +### Python module for printing on Peripage printers **This project is a continued development of the [original project](https://github.com/eliasweingaertner/peripage-A6-bluetooth) made by [Elias Weingärtner](https://github.com/eliasweingaertner). This module combined all results of reverse engineering of the Peripage A6/A6+ protocol in a python utility providing interface and CLI tool for printing on this thermal printer.** ## [The original introduction](https://github.com/eliasweingaertner/peripage-A6-bluetooth#introduction) -The Peripage A6 F622 is an inexpensive portable thermal printer. It provides both Bluetooth and USB connectivity. Unlike most other thermo printers it **does not** seem to support ESC/POS or any other standardized printer control language. +The Peripage A6 F622 is an inexpensive portable thermal printer. It provides both Bluetooth and USB connectivity. Unlike most other thermo printers it **does not** seem to support ESC/POS or any other standardized printer control language. So far, the Peripage A6 F622 can be only controlled using a proprietary app (iOS / Anndroid). There is also a driver for Windows with many limitations, most notably the need of defining a page size before printing; this is a huge limitation, as the Peripage prints on continuous form paper. -The script provided here was built based on an analysis of captured Bluetooth traffic between the printer and an Android device. The Peripage A6 uses the serial profile (BTSPP) and RFCOMM. +The script provided here was built based on an analysis of captured Bluetooth traffic between the printer and an Android device. The Peripage A6 uses the serial profile (BTSPP) and RFCOMM. Essentially, the script takes an input images, scales it to the printers native X resolution of 384 pixels, and then sends it to the printer. -## Current abilities +## Deprecation Warning + +**The latest version ot `ppa6-python` module is deprecated due the major update with new models support and better module naming** + +## Features * Printing text of any length encoded in ASCII * Printing Images using PIL library -* Printing raw bytes representing image in binary (black/white) form -* Printing a page break of desired size (in pixels) -* Printing using generator/iterator which returns image row by row in form of bytes or PIL Images +* Printing Images row-by row using binary row representation +* Printing page breaks using paper feed +* Printing using generator/iterator that return bytes for each row, chunks of bytes for each row, images * Requesting printer details (Serial Number, Name, Battery Level, Hardware Info and an option the meaning of which i don't know) -* Configuring print concentration (light, gray, black) +* Configuring print concentration (temperature) +* Changing printer serial number * Configuring printer poweroff timeout +* Supported printers: + * Peripage A6 + * Peripage A6+ + * Peripage A40 + +## Prerequisites -## Prerequirements -* Peripage A6/A6+ printer +* Peripage A6/A6+/A40/e.t.c printer * Python 3 ## Installation @@ -40,35 +50,18 @@ pip install . --user **Install from pypi using pip** ``` -pip install ppa6 +pip install peripage ``` ## Dependencies -* `PyBluez==0.30` -* `Pillow==8.1.2` -* `argparse==1.1` (for CLI) + +* `PyBluez>=0.30` +* `Pillow>=8.1.2` +* `argparse>=1.1` Install dependencies with `pip install -r requirements.txt` -On windows you may need to install PyBluez 0.3 -``` -git clone https://github.com/pybluez/pybluez -cd pybluez -pip install . --user -``` - -On raspberry pi it may require to install additional libraries -``` -sudo apt install libbluetooth-dev libopenjp2-7 libtiff5 -``` - -And in some cases you will have to restart the bluetooth adapter and service on raspberry pi when it fails to connect or device is busy -``` -sudo systemctl restart bluetooth -sudo hciconfig hci0 reset -``` - ## Identify printer Bluetooth MAC address **On linux:** @@ -100,68 +93,84 @@ Scanning bluetooth devices... please wait. 00:15:83:15:bc:5f Imaging PeriPage+BC5F ``` +## Troubleshooting + +> Windows installation requires installing PyBluez from master branch as pypi module is not updated + +``` +pip install git+https://github.com/pybluez/pybluez@master#egg=pybluez --user +``` + +> Raspberry PI installation requires additional libraries + +``` +sudo apt install libbluetooth-dev libopenjp2-7 libtiff5 +``` + +> Some cases may require restarting bluetooth adapter + +``` +sudo systemctl restart bluetooth +sudo hciconfig hci0 reset +``` + ## CLI usage **On linux** Install module and run -`ppa6 ` +`peripage ` **On windows** Install module and run -`python -m ppa6 ` +`python -m peripage ` ### Options ``` -usage: ppa6 [-h] -m MAC [-c [0-2]] [-b [0-255]] [-p {A6,A6p,A6+}] [-n] - (-t TEXT | -s | -i IMAGE | -q QR | -e) +$ python -m peripage -h +usage: __main__.py [-h] -m MAC [-c [0-2]] [-b [0-255]] -p {A6,A6p,A40} (-t TEXT | -s | -i IMAGE | -q QR | -e) -Print on a Peripage A6 / A6+ via bluetooth +Print on a Peripage printer via bluetooth optional arguments: -h, --help show this help message and exit -m MAC, --mac MAC Bluetooth MAC address of the printer -c [0-2], --concentration [0-2] - Concentration value for printing (0, 1, 2) + Concentration value for printing (temperature) -b [0-255], --break [0-255] - Size of the break that should be inserted after the - print (max 255) - -p {A6,A6p,A6+}, --printer {A6,A6p,A6+} - Printer model name (A6 or A6+/A6p (both allowed)) - -n, --newline Force printer to add newline at the end of the printed - text and flush the buffer - -t TEXT, --text TEXT ASCII text that should be printed. Add a line break at - the end of the string to avoid it being cut. String - can be empty, so just page break will be printed - -s, --stream Reads an input from stdin and prints as ASCII text + Size of the break inserted after printed image or text + -p {A6,A6p,A40}, --printer {A6,A6p,A40} + Printer model selection + -t TEXT, --text TEXT ASCII text to print. Text must be ASCII-safe and will be filtered for invalid characters + -s, --stream Print text received from STDIN, line by line. Text must be ASCII-safe and will be filtered for invalid characters -i IMAGE, --image IMAGE - Path to the image that should be printed - -q QR, --qr QR String for QR code print - -e, --introduce Ask the printer to introduce himself + Path to the image for printing + -q QR, --qr QR String to convert into a QR code for printing + -e, --introduce Ask the printer to introduce itself ``` ### Print image example -**Print image from [file](https://github.com/bitrate16/ppa6-python/blob/main/honk.png) with following break for 100px and concentration set to 2 (HIGH) on A6+** +**Print image from [file](https://github.com/bitrate16/peripage-python/blob/main/honk.png) with following break for 100px and concentration set to 2 (HIGH) on A6+** ``` -ppa6 -m 00:15:83:15:bc:5f -p A6p -b 100 -c 2 -i honk.png +peripage -m 00:15:83:15:bc:5f -p A6p -b 100 -c 2 -i honk.png ``` ### Print text example **Print some random text followed by newline and break for 100px on A6+** ``` -ppa6 -m 00:15:83:15:bc:5f -p A6p -b 100 -t "HONK" -n +peripage -m 00:15:83:15:bc:5f -p A6p -b 100 -t "HONK" -n ``` Newline is required to fush the internal printer buffer and force it to print all text without cutting -### Print Service example +## Print Service **Print 50 text tasks on A6+** ```python -import ppa6 +import peripage import print_service # Ping battery every 60 seconds @@ -170,13 +179,14 @@ import print_service # Wait 1 second before send after connecting/reconnecting to printer # Print only after pinging printer and waiting for 1 second service = print_service.PrintService(60, 5, 5, 1, 1) -service.start('00:15:83:15:bc:5f', ppa6.PrinterType.A6p) +service.start('00:15:83:15:bc:5f', peripage.PrinterType.A6p) for i in range(50): service.add_print_ascii(f'number {i}', flush=True) ``` Newline is required to fush the internal printer buffer and force it to print all text without cutting -### Suggestions +## Recommendations + * Don't forget about concentration, this can make print brighter and better visible. * Split long images into multiple print requests with cooldown time for printer (printer may overheat during a long print and will stop printing for a while. This will result in partial print loss because the internal buffer is about 250px height). For example, when you print [looooooooooooooooooooooooooooooongcat.jpg](http://lurkmore.so/images/9/91/Loooooooooooooooooooooooooooooooooooooooooongcat.JPG), split it into at least 20 pieces with 1-2 minutes delay because you will definetly loose something without cooling. Printer gets hot very fast. Yes, it was the first that i've printed. * Be carefull when printing lots of black or using max concentration, as i said, printer heats up very fast. @@ -186,7 +196,9 @@ Newline is required to fush the internal printer buffer and force it to print al ## Code example -View this [python notebook](https://github.com/bitrate16/ppa6-python/blob/main/notebooks/ppa6-tutorial.ipynb) for tutorial +View this [python notebook](https://github.com/bitrate16/peripage-python/blob/main/notebooks/peripage-tutorial.ipynb) for tutorial + +View this [python notebook](https://github.com/bitrate16/peripage-python/blob/main/notebooks/Test-notebook.ipynb) for test ## Printer disassembly @@ -196,20 +208,33 @@ View this [python notebook](https://github.com/bitrate16/ppa6-python/blob/main/n * Fix page sometimes get cutted off for some rows * Fix delays -* Python 2.7 support +* ~~Python 2.7 support~~ (Don't need) * Implement overheat protection * Implement cover open handler -* Tweak wait timings to precisely match the printing speed +* Tweak wait timings to precisely match printing speed * Implement printer renaming * Implement printing stop operation * Reverse-engineer USB driver and add support for it -* **FIX:** Print randomly gets cropped (some images getting cropped) -* **FIX:** 1 type conversion is low quality +* Print randomly gets cropped (some images getting cropped) +* 1 type conversion is low quality + +## Contribution + +> Q: How to contribute? +> +> A: Implement some features and make a pull request in this repo. For example, you could add info about USB communication, write a any-font printing using PIL text drawing, make an additional research in protocol and other cool things. + +> Q: How to get my printer supported? +> +> A: If you own a peripage printer that is currently unsupported, you can reverse-engineer the bluetooth packets captured from the oficial printing app and find out the specs of your printer (the main and the only spec is bytes per row). Another way is to find how many letters can fit in a row when using `printASCII()`. +> +> If you would like to participate, please make an issue and I will guide you on how to obtain required parameters. ## Credits * [Elias Weingärtner](https://github.com/eliasweingaertner) for initial work in reverse-engineering bluetooth protocol * [bitrate16](https://github.com/bitrate16) for additional research and python module +* [henryleonard](https://github.com/henryleonard) for specs of A40 printer ## Disclaimer @@ -223,4 +248,4 @@ SOFTWARE.** ## License -[MIT License](https://github.com/bitrate16/ppa6-python/blob/main/LICENSE) +[MIT License](https://github.com/bitrate16/peripage-python/blob/main/LICENSE) diff --git a/notebooks/Test-notebook.ipynb b/notebooks/Test-notebook.ipynb new file mode 100644 index 0000000..cfdfd4f --- /dev/null +++ b/notebooks/Test-notebook.ipynb @@ -0,0 +1,469 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import peripage\n", + "\n", + "printer = peripage.Printer(\n", + " mac='00:15:83:15:BC:5F',\n", + " printer_type=peripage.PrinterType.A6p\n", + ")\n", + "printer.connect()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "printer.disconnect()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "printer.reconnect()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "printer.reset()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'IP-300'\n", + "b'PeriPage+BC5F'\n", + "b'A6491571121'\n", + "b'V2.11_304dpi'\n", + "93\n", + "b'BR2141e-s(A02)_B9_20190815_r3460'\n", + "b'\\x00\\x15\\x83\\x15\\xbc_\\xc0\\x15\\x83\\x15\\xbc_'\n", + "b'PeriPage+BC5F|00:15:83:15:BC:5F|C0:15:83:15:BC:5F|V2.11_304dpi|A6491571121|93'\n" + ] + } + ], + "source": [ + "print(printer.getDeviceIP())\n", + "print(printer.getDeviceName())\n", + "print(printer.getDeviceSerialNumber())\n", + "print(printer.getDeviceFirmware())\n", + "print(printer.getDeviceBattery())\n", + "print(printer.getDeviceHardware())\n", + "print(printer.getDeviceMAC())\n", + "print(printer.getDeviceFull())\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'A6491571121'\n", + "b'abobus'\n", + "b'A6491571121'\n" + ] + } + ], + "source": [ + "print(printer.getDeviceSerialNumber())\n", + "printer.reset()\n", + "printer.setDeviceSerialNumber('abobus')\n", + "print(printer.getDeviceSerialNumber())\n", + "printer.reset()\n", + "printer.setDeviceSerialNumber('A6491571121')\n", + "print(printer.getDeviceSerialNumber())\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'OK'" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.setPowerTimeout(1)\n", + "# Will power off in one minute\n" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'OK'" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.setPowerTimeout(60)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "printer.printBreak(100)\n", + "# Should output a break of 100px\n" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "''" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.print_buffer = ''\n", + "printer.printASCII('a' * printer.getRowCharacters())\n", + "printer.print_buffer\n", + "# Shoud print out a full line\n" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.printASCII('a' * (printer.getRowCharacters() - 1))\n", + "printer.print_buffer\n", + "# Shoud not print\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "''" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.print_buffer = 'a' * (printer.getRowCharacters() - 1)\n", + "printer.printASCII('\\n')\n", + "printer.print_buffer\n", + "# Shoud flush the above data\n" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "''" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.print_buffer = ''\n", + "printer.printASCII(('a' * printer.getRowCharacters()) + ('a' * (printer.getRowCharacters() - 1)) + '\\n')\n", + "printer.print_buffer\n", + "# Shoud print two lines (one incomplete)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.print_buffer = ''\n", + "printer.printASCII(('a' * printer.getRowCharacters()) + ('a' * (printer.getRowCharacters() - 1)))\n", + "printer.print_buffer\n", + "# Shoud print one line\n" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "''" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "printer.print_buffer = ''\n", + "printer.printASCII('\\n\\n')\n", + "printer.print_buffer\n", + "# Shoud print two newlines\n" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [], + "source": [ + "for i in range(10):\n", + " printer.printRow(bytes.fromhex('55' * printer.getRowBytes()))\n", + "# Shoud write a single black-white row with pattern 01010101\n", + "\n", + "for i in range(10):\n", + " printer.printRow(bytes.fromhex('aa' * printer.getRowBytes()))\n", + "# Shoud write a single black-white row with pattern 10101010 (inverse)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [], + "source": [ + "chunk = [ bytes.fromhex('55' * printer.getRowBytes()), bytes.fromhex('aa' * printer.getRowBytes()) ] * 20\n", + "printer.printRowBytesIterator(chunk)\n", + "# Should write grid using iterator (very slow)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [], + "source": [ + "half_0 = '00' * (printer.getRowBytes() // 2)\n", + "half_1 = 'ff' * (printer.getRowBytes() // 2)\n", + "\n", + "half_left = bytes.fromhex(half_1 + half_0)\n", + "half_right = bytes.fromhex(half_0 + half_1)\n", + "\n", + "chunk = [ half_left ] * 20 + [ half_right ] * 20\n", + "chunk += chunk\n", + "\n", + "printer.printRowBytesIterator(chunk)\n", + "# Should output a half-black followed by inverse half black using iterator (very slow)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "chunk = [ bytes.fromhex('55' * printer.getRowBytes()), bytes.fromhex('aa' * printer.getRowBytes()) ] * 20\n", + "printer.printRowBytesList(chunk)\n", + "# Should write grid using list\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "half_0 = '00' * (printer.getRowBytes() // 2)\n", + "half_1 = 'ff' * (printer.getRowBytes() // 2)\n", + "\n", + "half_left = bytes.fromhex(half_1 + half_0)\n", + "half_right = bytes.fromhex(half_0 + half_1)\n", + "\n", + "chunk = [ half_left ] * 20 + [ half_right ] * 20\n", + "chunk += chunk\n", + "\n", + "printer.printRowBytesList(chunk)\n", + "# Should output a half-black followed by inverse half black using list\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "half_0 = '00' * (printer.getRowBytes() // 2)\n", + "half_1 = 'ff' * (printer.getRowBytes() // 2)\n", + "\n", + "half_left = bytes.fromhex(half_1 + half_0)\n", + "half_right = bytes.fromhex(half_0 + half_1)\n", + "\n", + "chunk = [ half_left ] * 20 + [ half_right ] * 20\n", + "\n", + "printer.printRowChunksIterator([ chunk, chunk ])\n", + "# Should output a half-black followed by inverse half black using chunk iterator\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "import PIL.Image\n", + "\n", + "honk = PIL.Image.open('honk.png')\n", + "\n", + "printer.printImage(honk)\n", + "# Should print a honk\n" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "import PIL.Image\n", + "\n", + "honk = PIL.Image.open('honk.png')\n", + "\n", + "pieces = [\n", + " honk.crop((0, 0, honk.size[0], honk.size[1] // 2)),\n", + " honk.crop((0, honk.size[1] // 2, honk.size[0], honk.size[1])),\n", + "]\n", + "\n", + "printer.printImageIterator(reversed(pieces))\n", + "# Should print a honk from two pieces using iterator, image is reversed for text\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "import PIL.Image\n", + "\n", + "honk = PIL.Image.open('honk.png')\n", + "\n", + "printer.setConcentration(0)\n", + "printer.printImage(honk)\n", + "\n", + "printer.setConcentration(1)\n", + "printer.printImage(honk)\n", + "\n", + "printer.setConcentration(2)\n", + "printer.printImage(honk)\n", + "# Should print a honk 3 times with different concentration\n" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "printer.printQR('https://www.youtube.com/watch?v=dQw4w9WgXcQ')\n", + "# Never gonna give you up\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/notebooks/ppa6-tutorial.ipynb b/notebooks/ppa6-tutorial.ipynb index 72aa207..57fada2 100644 --- a/notebooks/ppa6-tutorial.ipynb +++ b/notebooks/ppa6-tutorial.ipynb @@ -1,6 +1,7 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -13,14 +14,15 @@ "metadata": {}, "outputs": [], "source": [ - "import ppa6\n", + "import peripage\n", "\n", - "printer = ppa6.Printer('00:15:83:15:bc:5f', ppa6.PrinterType.A6p)\n", + "printer = peripage.Printer('00:15:83:15:bc:5f', peripage.PrinterType.A6p)\n", "printer.connect()\n", "printer.reset()" ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -57,6 +59,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -74,6 +77,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -98,6 +102,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ @@ -131,6 +136,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ diff --git a/ppa6.py b/peripage.py similarity index 68% rename from ppa6.py rename to peripage.py index 2889ec9..f991435 100644 --- a/ppa6.py +++ b/peripage.py @@ -1,10 +1,10 @@ #!/usr/bin/env python3 # MIT License -# +# # Copyright (c) 2021 bitrate16 -from ppa6.__main__ import main +from peripage.__main__ import main if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/peripage/__init__.py b/peripage/__init__.py new file mode 100644 index 0000000..11924ef --- /dev/null +++ b/peripage/__init__.py @@ -0,0 +1,851 @@ +#!/usr/bin/env python3 + +# MIT License +# +# Copyright (c) 2021-2023 bitrate16 + +__title__ = 'Peripage buetooth printing utility' +__version__ = '1.1' +__author__ = 'bitrate16' +__license__ = 'MIT' +__copyright__ = 'Copyright (c) MIT 2021-2023 bitrate16' + + +import time +import qrcode +import typing +import enum +import bluetooth + +import PIL.Image +import PIL.ImageOps + + +class PrinterTypeSpecs: + """ + Specification parameters for each printer model. required for unifying the + printing interface and easy adding new printe models. + + Defines: + * `row_bytes` - bytes per row encoding + * `row_width` - width of single row in pixels + * `row_characters` - width of row in ASCII-mode characters + """ + + def __init__(self, row_bytes: int, row_width: int, row_characters: int): + self.row_bytes = row_bytes + self.row_width = row_width + self.row_characters = row_characters + +class PrinterType(enum.Enum): + """ + Defines names for supported printer types. + Currently supported printers are: Peripage A6, A6+, A40 + """ + + A6 = PrinterTypeSpecs( + row_bytes=48, + row_width=384, + row_characters=32 + ) + + A6p = PrinterTypeSpecs( + row_bytes=72, + row_width=576, + row_characters=48 + ) + + A40 = PrinterTypeSpecs( + row_bytes=216, + row_width=1728, + row_characters=144 + ) + + @classmethod + def names(cls) -> typing.List[str]: + """List available keys from Enum""" + return [ e.name for e in cls ] + + def __new__(cls, *args, **kwds): + value = len(cls.__members__) + 1 + obj = object.__new__(cls) + obj._value_ = value + return obj + + def __init__(self, spec: PrinterTypeSpecs): + self.spec = spec + +class Printer: + """ + This class defines the Peripage interface utility. + It contains methods wrapping requests with special control opcodes. + By default instance of this class is constructed with timeout set + to 1s and printer type A6. + Currently there is no thermal overheat protection opcodes found, so + use printing carefully and avoid overheating of the printer which + may result in hardware break. + Currently there is no stop codes found, so you can not stop printing. + + It is required to perform reset() after connection to the printer. + """ + + @staticmethod + def filter_ascii(text: str) -> str: + """ + Remove non-safe-ascii letters from the string so it can be safety used + in most internal calls. + """ + + return ''.join([ i for i in text if (31 < ord(i) or ord(i) == 10) and ord(i) < 127 ]) + + @staticmethod + def is_safe_ascii(text: str) -> bool: + """ + Check is string does not contain non-safe-ascii letters (like `>0x7f` or `\\0`). + """ + + for i in text: + if (31 < ord(i) or ord(i) == 10) and ord(i) < 127: + return False + return True + + def __init__(self, mac: str, printer_type: PrinterType, timeout: float=1.0): + """ + Create instance of peripage connector. `mac` and `printer_type` are + required for bluetooth connection and printer-specific printing + parameters. + + In order to make printer operate normally, it is required to call + `reset()` after connecting. + + Arguments: + * `mac` - mac address of the printer + * `printer_type` - printer type enum with specification + * `timeout` - socket connection timeout in seconds + """ + + self.mac = mac + self.timeout = timeout + self.printer_type = printer_type + + # buffer used for continuous printing with line wrapping + self.print_buffer = '' + + def isConnected(self) -> bool: + """ + Check if printer is connected (socket alive) + """ + + try: + self.sock.getpeername() + return True + except: + return False + + def connect(self) -> None: + """ + Open a new connection to the printer without checking for existing + connection. In case of malfunction and/or twice connecting to the same + printer, socket descriptor becomes unoperateable. + + In order to make printer operate normally, it is required to call + `reset()` after connecting. + """ + + self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) + self.sock.connect((self.mac, 1)) + self.sock.settimeout(self.timeout) + + def reconnect(self) -> None: + """ + Reconnect to the printer with existing connection check. + + In order to make printer operate normally, it is required to call + `reset()` after connecting. + """ + + if self.isConnected(): + # self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + del self.sock + + self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) + self.sock.connect((self.mac, 1)) + self.sock.settimeout(self.timeout) + + def disconnect(self) -> None: + """ + Disconnect from the printer. + """ + + if self.isConnected(): + # self.sock.shutdown(socket.SHUT_RDWR) + self.sock.close() + del self.sock + + def setTimeout(self, timeout) -> None: + """ + Set the bluetooth socket connection recv / send timeout. + """ + + self.timeout = timeout + if self.isConnected(): + self.sock.settimeout(timeout) + + def tellPrinter(self, byteseq: bytes) -> None: + """ + Send `bytes` to the printer without response. + + Arguments: + * `byteseq` - `bytes` data + """ + + self.sock.send(byteseq) + + def askPrinter(self, byteseq: bytes, recv_size: int=1024) -> bytes: + """ + Send `bytes` to the printer with response. + + Arguments: + * `recv_size` - max size of received chunk + * `byteseq` - `bytes` data + """ + + self.sock.send(byteseq) + return self.sock.recv(recv_size) + + def listenPrinter(self, recv_size: int=1024) -> bytes: + """ + Receive data from printer. + + Arguments: + * `recv_size` - max size of received chunk + """ + + return self.sock.recv(recv_size) + + def tellPrinterSeq(self, byteseq: typing.Iterable[bytes]) -> None: + """ + Send list of `bytes` to the printer without response. + + Arguments: + * `byteseq` - `list` of `bytes` + """ + + for s in byteseq: + self.sock.send(s) + + def askPrinterSeq(self, byteseq: typing.Iterable[bytes], recv_size: int=1024) -> bytes: + """ + Send list of `bytes` to the printer with response. + + Arguments: + * `recv_size` - max size of received chunk + * `byteseq` - `list` of `bytes` + """ + + for s in byteseq: + self.sock.send(s) + return self.sock.recv(recv_size) + + def getDeviceIP(self) -> bytes: + """ + Query Unknown Property. + + Request: `10ff20f0`. + + Response: `bytes` with unknown property. + + Example: Peripage A6+ returns `IP-300`. + """ + + return self.askPrinter(bytes.fromhex('10ff20f0')) + + def getDeviceName(self) -> bytes: + """ + Query device name. + + Request: `10ff3011`. + + Response: `bytes` with `device_name+two_bytes_of_mac` + + Example: Peripage A6+ returns `PeriPage+DF7A`. + """ + + return self.askPrinter(bytes.fromhex('10ff3011')) + + def getDeviceSerialNumber(self) -> bytes: + """ + Query serial number. + + Request: `10ff20f2`. + + Response: `bytes` with serial number + + Example: Peripage A6+ returns `A6491571121`. + """ + + return self.askPrinter(bytes.fromhex('10ff20f2')) + + def getDeviceFirmware(self) -> bytes: + """ + Query device firmware version. + + Request: `10ff20f1`. + + Response: `bytes` with firmware version + + Example: Peripage A6+ returns `V2.11_304dpi`. + """ + + return self.askPrinter(bytes.fromhex('10ff20f1')) + + def getDeviceBattery(self) -> int: + """ + Query device battery percentage. + + Request: `10ff50f1`. + + Response: `bytes[2]` with percentage. `bytes[2] = { 0, percentage }` + + Example: Peripage A6+ returns `\\x00@` (equals to `bytes[2] = { 0, 64 }`). + """ + return int(self.askPrinter(bytes.fromhex('10ff50f1'))[1]) + + def getDeviceHardware(self) -> bytes: + """ + Query device hardware info. + + Request: `10ff3010`. + + Response: `bytes` with hw info. + + Example: Peripage A6+ returns `BR2141e-s(A02)_B9_20190815_r3460`. + `BR2141e-s` chip with a pile of ascii letters. + """ + + return self.askPrinter(bytes.fromhex('10ff3010')) + + def getDeviceMAC(self) -> bytes: + """ + Query device mac from device itself. + + Request: `10ff3012`. + + Response: `bytes` with mac address. + + Example: Peripage A6+ returns `\\x00\\xF5\\x73\\x25\\xAC\\x9F_\\x00\\xF5\\x73\\x25\\xAC\\x9F_` + (equals to `00:F5:73:25:AC:9F`). + """ + + return self.askPrinter(bytes.fromhex('10ff3012')) + + def getDeviceFull(self) -> bytes: + """ + Query full device info. + + Request: `10ff70f100`. + + Response: `bytes` with fill info. + + Example: Peripage A6+ returns `PeriPage+DF7A|00:F5:73:25:AC:9F|C5:12:81:19:2C:51|V2.11_304dpi|A6491571121|84` + (`name+mac_slice|device_mac|client_mac|firmware|serial_number|battery_percentage`). + + WARNING: + + This command has a side-effect causing the printed images getting + corrupted by shifting horisontally and adding a █ character to the + in-printer ASCII buffer. + """ + + return self.askPrinter(bytes.fromhex('10ff70f100')) + + def getRowBytes(self) -> int: + """ + Get row_bytes spec for current printer. + + Images are encoded as 1-pixel-per-1-bit, which means that 1 byte can + encode 8 black-white pixels in a line. This property defines the bytes + limit per image row, the overflow is truncated. + """ + + return self.printer_type.spec.row_bytes + + def getRowWidth(self) -> int: + """ + Get row_width spec for current printer. + + Images are encoded as 1-pixel-per-1-bit, which means that 1 byte can + encode 8 black-white pixels in a line. This property defines the pixel + limit per image row, the overflow is truncated. + """ + + return self.printer_type.spec.row_width + + def getRowCharacters(self) -> int: + """ + Get row_characters spec for current printer. + + Internal ASCII printing mode allows printer to output the raw ASCII + letters up to `0x7f` and below to `0x10`. This property defunes the + amount of letters that can fit in a single row. in case of wrapped + printing, the overflow is wrapped using in-class buffer and synchronized + with the in-printer buffer. + """ + + return self.printer_type.spec.row_characters + + def getHeightLimit(self) -> int: + """ + Get the limit of single-image printing chunk. + + Printer protocol allows only 16-bit number as the definition for the + image printing procedire, that requires used to vertically split the + image into multiple chunks. + """ + + return 0xffff + + def setDeviceSerialNumber(self, serial_number: str, wait: bool=True) -> None: + """ + Set device serial number. + + Set a new device serial number explicitly. `serial_number` defines the + new serial number for the device. This serial number must be + ascii-encodable string that match the requirements of + `Printer.is_safe_ascii()` filter in order to work. Serial number string + is additionally filtered with `Printer.filter_ascii` if you haven't read + the previous sentence. + + Request: `10ff20f4+ascii_str+00`. + + Arguments: + * `serial_number` - serial number string that passes the + `Printer.is_safe_ascii()` check. + """ + + request = bytes.fromhex('10ff20f4') + Printer.filter_ascii(serial_number).encode('ascii') + b'\0' + + if wait: + return self.askPrinter(request) + else: + self.tellPrinter(request) + + def setPowerTimeout(self, timeout: int, wait: bool=True) -> None: + """ + Set device poweroff timeout. + + Device standby mode is triggered by any action made with the device. It + can be either a print task, battery lever query and anything else that + envolves ask-answer communication. Power timeout defines the internal + auto poweroff timeout of the device in minutes, up to `0xffff` minutes. + + Request: `10ff12+bytes[2]:big_endian`. + + Arguments: + * `timeout` - new timeout value between `0` and `0xffff`, minutes + """ + + timeout = max(min(0xfff0, timeout), 0x0001) + request = bytes.fromhex('10ff12') + int.to_bytes(timeout, 2, 'big') + + if wait: + return self.askPrinter(request) + else: + self.tellPrinter(request) + + def setConcentration(self, concentration: int, wait: bool=False) -> None: + """ + Set printing concentration level. + + Printer supports multiple temperature concentration modes that allow to + print darker or lighter images with the price of overheating. The more + concentration - the longer lasting image will be. + + Request: `10ff1000+bytes[1]:big_endian`. + + Arguments: + * `concentration` - concentration value from range `(0, 1, 2)` + """ + + if concentration <= 0: + request = bytes.fromhex('10ff100000') + elif concentration == 1: + request = bytes.fromhex('10ff100001') + elif concentration >= 2: + request = bytes.fromhex('10ff100002') + + if wait: + return self.askPrinter(request) + else: + self.tellPrinter(request) + + def reset(self) -> None: + """ + Send reset request, required for initial printer initialization after + connect/reconnect. Without this operation, printer will not print nor + return any data. + + Request: `10fffe01+000000000000000000000000`. + """ + + self.tellPrinter(bytes.fromhex('10fffe01000000000000000000000000')) + + def printBreak(self, size: int=0x40) -> None: + """ + Ask printer to print out a break of fixed size. + + Printer allows user to feed out some paper to wipe away tears of this + module developer. + + Request: `1b4a+bytes[1]:big_endian`. + + Arguments: + * `size` - break size in range `(0, 0xff)` + """ + + size = min(0xff, max(0x01, size)) + request = bytes.fromhex('1b4a') + int.to_bytes(size, 1, 'big') + + self.tellPrinter(request) + + def writeASCII(self, text: str='\n', wait=False) -> None: + """ + WARNING: THIS API IS UNSAFE + + Write text into printer without internal safety-checks and filtering. If + you want to print text with internal checks for non-ascii or + unsafe-ascii characters, use `Printer.printASCII()`. If you need to use + this function, check you text with `Printer.is_safe_ascii` or filter + with `Printer.filter_ascii` and do not leave more than one sequential + `\\n` character. + + Request: `ascii_str`. + + Arguments: + * `text` - text to be printed, should be checked by user before print or + may malfunction and/or damage the printer. String must not contain + repeating `\\n` characters or printer will freeze. + """ + + request = text.encode('ascii') + + if wait: + return self.askPrinter(request) + else: + self.tellPrinter(request) + + def printlnASCII(self, text: str='', delay: float=0.25) -> None: + """ + Safe to use printing method that relies on in-class buffer for wrapping + text. The input is filtered with `Printer.filter_ascii` in order to + exclude all non-safe-ascii characters and later splitted into multiple + chunks over `\\n` in order to prevent freeze caused by twice-newline in + printer buffer. This function is equal to normal `println` in C and + semi-equal to `print(text + '\\n')`. This method relies on in-class + buffer to track printed data and keeping sync with in-printer buffer. + + Request: `impl:Printer.printASCII()`. + + Arguments: + * `text` - text to be printed, automatically filtered with + `Printer.filter_ascii()` and splitted into newline-chunked data. + * `delay` - delay between lines submission, seconds + """ + + self.printASCII(text=text + '\n', delay=delay) + + def printASCII(self, text: str='\n', delay: float=0.25) -> None: + """ + Safe to use printing method that relies on in-class buffer for wrapping + text. The input is filtered with `Printer.filter_ascii` in order to + exclude all non-safe-ascii characters and later splitted into multiple + chunks over `\\n` in order to prevent freeze caused by twice-newline in + printer buffer. This function is equal to normal `print` in C. This + method relies on in-class buffer to track printed data and keeping sync + with in-printer buffer. + + In case when the input text contains two sequential `\\n`, they are + replaced with `Printer.printBreak(30)`. + + Request: `impl:Printer.writeASCII()`. + + Arguments: + * `text` - text to be printed, automatically filtered with + `Printer.filter_ascii()` and splitted into newline-chunked data. + * `delay` - delay between lines submission, seconds + """ + + text = Printer.filter_ascii(text) + + # Check for empty and print out newline + text = self.print_buffer + text + self.print_buffer = '' + if len(text) == 0: + return + + # Special case: \n only, causes duplicating newlines (white-only string) + if len(text.strip()) == 0: + for s in text: + if s == '\n': + self.printBreak(30) + time.sleep(delay) + return + + # Iterlines + lines = text.split('\n') + for l in lines: + + # Flush previuos incomplete line + if len(self.print_buffer) != 0: + self.tellPrinter(self.print_buffer.encode('ascii')) + self.tellPrinter(b'\n') + self.print_buffer = '' + time.sleep(delay) + + # Flush if white-empty, because it is newline + elif len(l.strip()) == 0: + + # Flush in-printer buffer if not empty + if len(self.print_buffer) != 0: + self.tellPrinter(self.print_buffer.encode('ascii')) + self.tellPrinter(b'\n') + self.print_buffer = '' + time.sleep(delay) + + # Trail + else: + self.printBreak(30) + time.sleep(delay) + + # Process normal lines + else: + # Wrap line + parts = [ l[i:i+self.getRowCharacters()] for i in range(0, len(l), self.getRowCharacters()) ] + + for p in parts: + + # Print full line + if len(p) == self.getRowCharacters(): + self.tellPrinter(p.encode('ascii')) + self.tellPrinter(b'\n') + time.sleep(delay) + + # Partial, write to buffer + else: + self.print_buffer = p + + def flushASCII(self, delay: float=0.25) -> None: + """ + Force=print out buffer if it is not empty. Not equal to + `Printer.println()` because does not output empty newline if buffer is + empty. + + Request: `impl:Printer.printASCII()`. + + Arguments: + * `delay` - delay between lines submission, seconds + """ + + if len(self.print_buffer) != 0: + self.tellPrinter(self.print_buffer.encode('ascii')) + self.tellPrinter(b'\n') + self.print_buffer = '' + time.sleep(delay) + + def printRow(self, rowbytes: bytes, delay: float=0.01) -> None: + """ + Send bytes representing a single image row in binary black/white mode. + If amount of bydes exceedes the `Printer.getRowBytes()` constant, input + is truncated. If size of input is under the `Printer.getRowBytes()`, it + will be padded with zeros. + + Request: `1d763000+bytes[2]:big_endian+0100+bytes[Printer.getRowBytes()*1]`. + + Note: In case of A6+, preamble is `1d76300048000100` that can be viewed + as `[ 1d7630, 0030, 0001 ]`, where `1d7630` is printing operation + request, `0030` is big endian bytes per row, `0001` is big endian input + height. + + Arguments: + * `rowbytes` - bytes representing image pixels, 8 pixels per byte, + truncated/padded to fit `Printer.getRowBytes()`. + * `delay` - delay between printing each row of the image. + """ + + expectedLen = self.getRowBytes() + if len(rowbytes) < expectedLen: + rowbytes = rowbytes.ljust(expectedLen, b'\0') + elif len(rowbytes) > expectedLen: + rowbytes = rowbytes[:expectedLen] + + self.reset() + + # Notify printer about incomming $expectedLen bytes row + request = bytes.fromhex('1d763000') + int.to_bytes(self.getRowBytes(), 1, 'big') + bytes.fromhex('000100') + rowbytes + self.tellPrinter(request) + time.sleep(delay) + + # We're done here + + def printRowBytesList(self, rowbytes: typing.Iterable[bytes], delay: float=0.01) -> None: + """ + Send an array of bytes representing a multiple image rows in binary + black/white mode. If amount of bydes per row exceedes the + `Printer.getRowBytes()` constant, input is truncated. If size of input + is under the `Printer.getRowBytes()`, it will be padded with zeros. + + This printer supports pages up to `0xffff` rows, but current + implementation relies on chunked data with height limit of `0xff` and + automatically slices the input into chunks. + + Note: In case of A6+, preamble is `1d76300048000100` that can be viewed + as `[ 1d7630, 0030, 0001 ]`, where `1d7630` is printing operation + request, `0030` is big endian bytes per row, `0001` is big endian input + height. + + Request: chunked `1d763000+bytes[1]:big_endian+00+bytes[1]:big_endian+00+bytes[Printer.getRowBytes()*chunk_height]`. + + Arguments: + * `rowbytes` - list of bytes defining each row of the image. If row + length does not match the `Printer.getRowBytes()`, data is + truncated/padded to match the size. + * `delay` - delay between printing each row of the image. + """ + + if len(rowbytes) == 0: + return + + expectedLen = self.getRowBytes() + chunks = [ rowbytes[i:i+0xff] for i in range(0, len(rowbytes), 0xff) ] + + for chunk in chunks: + + # Reset state before print + self.reset() + + # 1d763000 30 00 01 00 + # Send preamble: `1d763000` + row_bytes:bytes[1] + `00` + chunk_size:bytes[1] + `00` + request = bytes.fromhex('1d763000') + int.to_bytes(self.getRowBytes(), 1, 'big') + bytes.fromhex('00') + int.to_bytes(len(chunk), 1, 'big') + bytes.fromhex('00') + + # Flush preamble + self.tellPrinter(request) + + # Flush rows dith delay + for row in chunk: + # trunc/pad + if len(row) < expectedLen: + row = row.ljust(expectedLen, b'\0') + elif len(row) > expectedLen: + row = row[:expectedLen] + + self.tellPrinter(row) + + time.sleep(delay) + + def printRowBytesIterator(self, rowiterator: typing.Iterable[bytes], delay: float=0.01) -> None: + """ + Iterate over the given iterator and print out all produced rows. This + method is very slow as it required printer to oftenly switch on/off + printing mode and pass a large overhead to set up the printing mode. + + This method uses the `Printer.printRow()` call. + + Arguments: + * `rowiterator` - iterator that returns bytes. + * `delay` - delay between printing each row of the image. + """ + + for r in rowiterator: + self.printRow(r, delay=delay) + + def printRowChunksIterator(self, rowiterator: typing.Iterable[typing.List[bytes]], delay: float=0.01) -> None: + """ + Iterate over the given iterator and print out all produced chunks of + rows. One chunk of rows is a list of bytes where each bytes define the + specific line of the image. This method is better than the use of + `Printer.printRowBytesIterator()` as it passes each chunk of image data + directly into the `Printer.printImageRowBytesList()`. + + Arguments: + * `rowiterator` - iterator that returns list[bytes]. + * `delay` - delay between printing each row of the image. + """ + + for chunk in rowiterator: + self.printRowBytesList(chunk, delay=delay) + + def printImageBytes(self, imagebytes: bytes, delay: float=0.01) -> None: + """ + Send an bytes representing single-line encoded image. For example, + `[0xff000000, 0x00ff0000, 0x0000ff00, 0x000000ff]` is encoded as + `0xff00000000ff00000000ff00000000ff`. + + Image must be valid aligned and sequence size must divide by + `Printer.getRowBytes()`. In case of partial data, the rest of partial + data is padded with zeros. Number of lines is calcualted as + `nlines = ceil(len(imagebytes) / Printer.getRowBytes())`. + + Arguments: + * `imagebytes` - bytes defining concatenated rows of the image. Each + row must be aligned to `Printer.getRowBytes()` in order to display + properly. If length of the last row dows not match + `Printer.getRowBytes()`, data is truncated/padded to match the size. + * `delay` - delay between printing each row of the image. + """ + + if len(imagebytes) == 0: + return + + # Delegate to impl + self.printRowBytesList([ imagebytes[i:i+self.getRowBytes()] for i in range(0, len(imagebytes), self.getRowBytes()) ], delay=delay) + + def printImage(self, img: PIL.Image.Image, delay=0.01, resample=PIL.Image.Resampling.NEAREST) -> None: + """ + Print PIL Image on this printer with automatic internal to-blackwhite + conversion. + + WARNING: In order to prevent the overhead of the printer (and possibly + loose some data but to limitations of the in-printer buffer) it is + suggested to split image into many vertical pieces and wait a + reasonable amount of time to let the printer to cooldown. + + Arguments: + * `img` - your pretty PIL Image. + * `delay` - delay between printing each row of the image. + * `resample` - resampling mode of the image, used to automatically + rescale image to fit the printer width of `Printer.getRowWidth()`. + """ + + img = img.convert('L') + img = PIL.ImageOps.invert(img) + img = img.resize((self.getRowWidth(), int(self.getRowWidth() / img.size[0] * img.size[1])), resample) + img = img.convert('1') + + imgbytes = img.tobytes() + self.printImageBytes(imgbytes, delay=delay) + + def printImageIterator(self, imgiterator: typing.Iterable[PIL.Image.Image], delay: float=0.01): + """ + Iterate over iterator and print out each PIL Image that it returns. + + Arguments: + * `rowiterator` - iterator that returns list[bytes]. + * `delay` - delay between printing each row of the image. + """ + + for img in imgiterator: + self.printImage(img, delay=delay) + + def printQR(self, text: str, delay: float=0.01, resample=PIL.Image.Resampling.NEAREST) -> None: + """ + Generate a QR code from specified string and print it. + + Arguments: + * `text` - your pretty text. + * `delay` - delay between printing each row of the image. + * `resample` - resampling mode of the image, used to automatically + rescale image to fit the printer width of `Printer.getRowWidth()`. + """ + + self.printImage(qrcode.make(text, border=0), delay=delay, resample=resample) diff --git a/peripage/__main__.py b/peripage/__main__.py new file mode 100644 index 0000000..f761aec --- /dev/null +++ b/peripage/__main__.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 + +# MIT License +# +# Copyright (c) 2021 bitrate16 + +def main(): + import argparse + import sys + import peripage + import PIL.Image + + parser = argparse.ArgumentParser(description='Print on a Peripage printer via bluetooth') + parser.add_argument( + '-m', '--mac', + help='Bluetooth MAC address of the printer', + required=True, + type=str + ) + parser.add_argument( + '-c', '--concentration', + help='Concentration value for printing (temperature)', + choices=[0, 1, 2], + metavar='[0-2]', + type=int, + default=0 + ) + parser.add_argument( + '-b', '--break', + dest='break_size', + help='Size of the break inserted after printed image or text', + choices=range(256), + metavar='[0-255]', + type=int, + default=0 + ) + parser.add_argument( + '-p', '--printer', + help='Printer model selection', + choices=peripage.PrinterType.names(), + type=str, + required=True + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + '-t', '--text', + help='ASCII text to print. Text must be ASCII-safe and will be filtered for invalid characters', + type=str + ) + group.add_argument( + '-s', '--stream', + help='Print text received from STDIN, line by line. Text must be ASCII-safe and will be filtered for invalid characters', + action='store_true' + ) + group.add_argument( + '-i', '--image', + help='Path to the image for printing', + type=str + ) + group.add_argument( + '-q', '--qr', + help='String to convert into a QR code for printing', + type=str + ) + group.add_argument( + '-e', '--introduce', + help='Ask the printer to introduce itself', + action='store_true' + ) + + args = parser.parse_args() + + # Open connection + printer = peripage.Printer(args.mac, peripage.PrinterType[args.printer]) + printer.connect() + printer.reset() + + # Act based on args + if 'introduce' in args and args.introduce: + + # print('Hello, my name is Harold..') + print(printer.getDeviceFull().decode('ascii')) + printer.disconnect() + sys.exit(0) + + elif 'stream' in args and args.stream: + + printer.setConcentration(args.concentration) + + while True: + try: + line = input().rstrip() + + printer.printlnASCII(line) + + except EOFError: + # Input closed ^d^d + break + + if args.break_size > 0: + printer.printBreak(args.break_size) + + printer.disconnect() + + sys.exit(0) + + elif 'text' in args and args.text is not None: + + printer.setConcentration(args.concentration) + + text = args.text.rstrip() + + if len(text) > 0: + printer.printASCII(text) + printer.flushASCII() + + if args.break_size > 0: + printer.printBreak(args.break_size) + + printer.disconnect() + + sys.exit(0) + + elif 'image' in args and args.image is not None: + + printer.setConcentration(args.concentration) + + try: + img = PIL.Image.open(args.image) + except: + print(f'Failed to open image { args.image }') + + printer.printImage(img) + + if args.break_size > 0: + printer.printBreak(args.break_size) + + printer.disconnect() + + sys.exit(0) + + elif 'qr' in args and args.qr is not None: + + printer.setConcentration(args.concentration) + + printer.printQR(args.qr) + + if args.break_size > 0: + printer.printBreak(args.break_size) + + printer.disconnect() + + sys.exit(0) + + else: + + print('How did you get there?') + +if __name__ == '__main__': + main() diff --git a/ppa6/__init__.py b/ppa6/__init__.py deleted file mode 100644 index 7bf3fcd..0000000 --- a/ppa6/__init__.py +++ /dev/null @@ -1,829 +0,0 @@ -#!/usr/bin/env python3 - -# MIT License -# -# Copyright (c) 2021 bitrate16 - -__title__ = 'Peripage A6/A6+ buetooth printing utility' -__version__ = '0.5' -__author__ = 'bitrate16' -__license__ = 'MIT' -__copyright__ = 'Copyright (c) 2021 bitrate16' - -import time -import math -import qrcode -import socket -import bluetooth - -from PIL import Image, ImageOps -from enum import Enum - - -class PrinterType(Enum): - """ - Defines names for supported printer types. - Currently supported printers are: Peripage A6, Peripage A6+ - """ - A6 = 6 - A6p = 7 - -class Printer: - """ - This class defines the Peripage A6 / A6+ interface utility. - It contains methods wrapping requests with special control opcodes. - By default instance of this class is constructed with timeout set - to 1s and printer type A6. - Currently there is no thermal overheat protection opcodes found, so - use printing carefully and avoid overheating of the printer which - may result in hardware break. - Currently there is no stop codes found, so you can not stop printing. - - It is required to perform reset() after connection to the printer. - """ - - def __init__(self, mac, printertype=PrinterType.A6, timeout=1.0): - """ - Creates an instance of this wrapper class and stores passed mac - address for future connection (reconnection) to the printer. - By default printer type is set to A6. - By default timeout for connection / request is set to 1sec and - can be changed later with setTimeout(timeout). - - It is required to perform reset() after connection to the printer. - - TODO: Autimatically recognize printer type and avoid using - explicit definition or leave it for explicit compability bypassing. - - :param mac: MAC address of the printer - :type mac: str - :param printerType: type of the printer - :type printerType: PrinterType - :param timeout: timeot for connection - :type timeout: float - """ - - self.mac = mac - self.timeout = timeout - self.printerType = printertype - - # This buffer is used for continuous printing - self.printBuffer = '' - - def isConnected(self): - """ - Checks if printer is connected. - """ - - try: - self.sock.getpeername() - return True - except: - return False - - def connect(self): - """ - Opens a new connection to the printer. Does not perform check if it - was already connected or socket is in use. - - It is required to perform reset() after connection to the printer. - """ - - self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) - self.sock.connect((self.mac, 1)) - self.sock.settimeout(self.timeout) - - def reconnect(self): - """ - Reconnects to the printer. If connection already exists, socket is - being closed first. - - It is required to perform reset() after connection to the printer. - - TODO: Figure out how to reconnect this bluetooth socket - """ - - if self.isConnected(): - # self.sock.shutdown(socket.SHUT_RDWR) - self.sock.close() - del self.sock - - self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) - self.sock.connect((self.mac, 1)) - self.sock.settimeout(self.timeout) - - def disconnect(self): - """ - Disconnects from the printer. - """ - - if self.isConnected(): - # self.sock.shutdown(socket.SHUT_RDWR) - self.sock.close() - del self.sock - - def setTimeout(self, timeout): - """ - Used to set a connection / send / recv timeout for bluetooth socket. - - :param timeout: new timeout value - :type timeout: float - """ - - self.timeout = timeout - self.sock.settimeout(timeout) - - - def tellPrinter(self, byteseq): - """ - Send byte sequence to the printer and return, no response is expected - - :param byteseq: bytes of information to send - :type byteseq: bytes - """ - - self.sock.send(byteseq) - - def askPrinter(self, byteseq, recv_size=1024): - """ - Send byte sequence to the printer and receive recv_size bytes as response - - :param byteseq: bytes of information to send - :type byteseq: bytes - :param recv_size: receiver buffer size - :type recv_size: int - """ - - self.sock.send(byteseq) - return self.sock.recv(recv_size) - - def listenPrinter(self, recv_size=1024): - """ - Receive recv_size bytes from the printer - - :param recv_size: receiver buffer size - :type recv_size: int - """ - - return self.sock.recv(recv_size) - - def tellPrinterSeq(self, byteseq): - """ - Send byte sequence list to the printer and return, no response is expected - - :param byteseq: array or bytes[] - :type byteseq: list - """ - - for s in byteseq: - self.sock.send(s) - - def askPrinterSeq(self, byteseq, recv_size=1024): - """ - Send byte sequence list to the printer and receive recv_size bytes as response - - :param byteseq: array or bytes[] - :type byteseq: list - :param recv_size: receiver buffer size - :type recv_size: int - """ - - for s in byteseq: - self.sock.send(s) - return self.sock.recv(recv_size) - - - def getDeviceIP(self): - """ - Returns Unknown Property as response of 0x10ff20f0. - For example, Peripage A6+ returns 'IP-300' - """ - - return self.askPrinter(bytes.fromhex('10ff20f0')) - - def getDeviceName(self): - """ - Returns Device Name as response of 0x10ff3011. - For example, Peripage A6+ returns 'PeriPage+DF7A' - """ - - return self.askPrinter(bytes.fromhex('10ff3011')) - - def getDeviceSerialNumber(self): - """ - Returns Serial Number as response of 0x10ff20f2. - For example, Peripage A6+ returns 'A6491571121' - """ - - return self.askPrinter(bytes.fromhex('10ff20f2')) - - def getDeviceFirmware(self): - """ - Returns Firmware Version as response of 0x10ff20f1. - For example, Peripage A6+ returns 'V2.11_304dpi' - """ - - return self.askPrinter(bytes.fromhex('10ff20f1')) - - def getDeviceBattery(self): - """ - Returns battery value as response of 0x10ff50f1. - For example, Peripage A6+ returns '\x00@' (sample retult). - Request returns result of type byte[2] where result[1] is charge - lever, result[0] is 0. - """ - return int(self.askPrinter(bytes.fromhex('10ff50f1'))[1]) - - def getDeviceHardware(self): - """ - Returns Hardware Info as response of 0x10ff3010. - For example, Peripage A6+ returns 'BR2141e-s(A02)_B9_20190815_r3460', - that means that it has a BR2141e-s chip inside and a small amount of - additional ascii characters. - """ - - return self.askPrinter(bytes.fromhex('10ff3010')) - - def getDeviceMAC(self): - """ - Returns Device's MAC address as response of 0x10ff3012. - For example, Peripage A6+ returns - '\x00\xF5\x73\x25\xAC\x9F_\x00\xF5\x73\x25\xAC\x9F_', which is equal to - 00:F5:73:25:AC:9F - """ - - return self.askPrinter(bytes.fromhex('10ff3012')) - - def getDeviceFull(self): - """ - Returns Full device information as response of 0x10ff70f1. - For example, Peripage A6+ returns - 'PeriPage+DF7A|00:F5:73:25:AC:9F|C5:12:81:19:2C:51|V2.11_304dpi|A6491571121|84', - which means that it's name is 'PeriPage+DF7A', mac is 00:F5:73:25:AC:9F, - connected to mac C5:12:81:19:2C:51, F/W V2.11_304dpi, S/N A6491571121, battery - level 84%. - Use with care. This command has sub effect causing the printed images to corrupt - shifting horisontally and adding a █ character to ASCII buffer. - """ - - return self.askPrinter(bytes.fromhex('10ff70f100')) - - def getRowBytes(self): - """ - Returns an amount of bytes used to encode a single image row. - Image is encoded as one bit per pixel. For Peripage A6 a single row contains 48 - bytes of biary pixel data. Peripage A6+'s single row contains 72 bytes of binary - pixel data. - For example 0b10101010 encodes an image with alternating black and white pixels, - total 8 pixels and 1 byte to encode. - If you want to work with raw bytes printing, you shout fit your images into this - bytes limits. - """ - - if self.printerType == PrinterType.A6: - return 48 - elif self.printerType == PrinterType.A6p: - return 72 - else: - raise ValueError('Unsupported printer type') - - def getRowWidth(self): - """ - Returns resolution of a single image row that can be printed. - For Peripage A6 it is 384 pixels, for Peripage A6+ it is 576 pixels. - If you want to work with raw bytes printing, you shout fit your images into this - pixel limits. - """ - - if self.printerType == PrinterType.A6: - return 384 - elif self.printerType == PrinterType.A6p: - return 576 - else: - raise ValueError('Unsupported printer type') - - def getRowCharacters(self): - """ - Returns amount of characters that may fit in a single row. - By default A6+ can fit up to 48 characters, A6 can fit up to 32 characters. - """ - - if self.printerType == PrinterType.A6: - # TODO: Measure amount of characters that can fit in the line - return 32 - elif self.printerType == PrinterType.A6p: - return 48 - else: - raise ValueError('Unsupported printer type') - - def getHeightLimit(self): - """ - Returns height limit for image in pixels. If image size exceeds this height, it - should be splitted into multiple different images - """ - return 0xffff - - - def setDeviceSerialNumber(self, snstr, wait=True): - """ - Sets a new Serial Number for the device as snstr using 0x10ff20f4 - Autimatically terminates string with '\0' because '\0' is required to explicitly - send string in C-style to the printer. Missing '\0' may result in internal memory - corruption or S/N read timeouts (which can be fixed by writing a new correct S/N). - Limitations for S/N string are not currently determined. - Call to 0x10ff20f4 returns 'OK' as the result of S/N change. - - :param snstr: new serial number - :type snstr: str - :param wait: wait for response - :type wait: bool - """ - - self.tellPrinter(bytes.fromhex('10ff20f4')) - if wait: - return self.askPrinter(bytes(snstr + '\0', 'ascii')) - else: - self.tellPrinter(bytes(snstr + '\0', 'ascii')) - - def setPowerTimeout(self, timeout, wait=True): - """ - Sets the device timeout (in minutes) to the value (bound between 0x0001 and 0xfff0) - using 0x10ff12. - Call to 0x10ff12 returns 'OK' as the result of timeout change. - - :param timeout: power timeout in range (0x0001, 0xfff0) - :type timeout: int - :param wait: wait for response - :type wait: bool - """ - - timeout = max(min(0xfff0, timeout), 0x0001) - strtimeout = '{0:0{1}X}'.format(timeout, 4) - if wait: - return self.askPrinter(bytes.fromhex('10ff12' + strtimeout)) - else: - self.tellPrinter(bytes.fromhex('10ff12' + strtimeout)) - - def setConcentration(self, cons, wait=False): - """ - Sets the printing concentration using 0x10ff1000 opcode. - Currently allowed values are 0, 1, 2 which represents light, medium, hard - (heating intensivity). Other values are not tested yet. - - :param cons: concentration (0, 1, 2) - :type cons: int - :param wait: wait for response - :type wait: bool - """ - - opcode = '' - - if cons <= 0: - opcode = '10ff100000' - elif cons == 1: - opcode = '10ff100001' - elif cons >= 2: - opcode = '10ff100002' - - if wait: - return self.askPrinter(bytes.fromhex(opcode)) - else: - self.tellPrinter(bytes.fromhex(opcode)) - - - def reset(self): - """ - Performs reset operation (The initial purpose of it is stoll unknown) - required before printing stream of bytes in a binary image. - Opcode for this operation is 0x10fffe01 followed by 0x000000000000000000000000. - This operation has to be performed before any other printing operation and after - connect to printer. - """ - - self.tellPrinter(bytes.fromhex('10fffe01000000000000000000000000')) - - def printBreak(self, size=0x40): - """ - Asks printer to print a line break with specified size (in pixels) using 0x1b4a. - Value expected in range (0x01, 0xff). - - :param text: size of break in range (0x1, 0xff) - :type text: int - """ - - size = min(0xff, max(0x01, size)) - strsize = '{0:0{1}X}'.format(size, 2) - self.tellPrinter(bytes.fromhex('1b4a' + strsize)) - - def writeASCII(self, text='\n', wait=False): - """ - Deprecated. - Write raw ASCII string to the printer. - By default this printer accepts an ascii string for printing it with raw monospace - font. Printer has internal buffer (getRowCharacters()) that will - accumulate the received characters. Printer will print out the buffer if meets a '\n' - character or buffer overflows. - This function expects only ASCII characters without control codes (0x00-0x20, 0xFF). - This function is not recommended to use while printer is in byte stream printing mode - or while it expects arguments for some of it's opcodes. - If string contains sequently repeating '\n' characters, the printer may freeze. So - it's recommended to use printASCII() instead. - - :param text: string containing ASCII characters - :type text: str - :param wait: wait for response - :type wait: bool - """ - - if wait: - return self.askPrinter(bytes(text, 'ascii')) - else: - self.tellPrinter(bytes(text, 'ascii')) - - def printlnASCII(self, text='\n', delay=0.25): - """ - Write raw ASCII string to the printer. - By default this printer accepts an ascii string for printing it with raw monospace - font. Printer has internal buffer (getRowCharacters()) that will - accumulate the received characters. Printer will print out the buffer if meets a '\n' - character or buffer overflows. - This function expects only ASCII characters without control codes (0x00-0x20, 0xFF). - This function is not recommended to use while printer is in byte stream printing mode - or while it expects arguments for some of it's opcodes. - If string contains sequently repeating '\n' characters, they will be replaced with - printBreak(30) which matches the length of the '\n\n'. This function automatically - slices string into pieces of size getRowCharacters() and waits till new piece being - printed. - This function acts as println. This function will print out the data stored in the - buffer of printASCII(). - - :param text: string containing ASCII characters - :type text: str - :param delay: delay between sending each line - :type delay: float - """ - - # Remove non-ASCII & control (except \n) - text = ''.join([i for i in text if (31 < ord(i) or ord(i) == 10) and ord(i) < 127]) - - ## Remove last '\n' to avoid it's duplication - #if len(text) > 0 and text[-1] == '\n': - # text = text[:-1] - # - # Check for empty and print out newline - text = self.printBuffer + text - if len(text) == 0: - self.printBreak(30) - time.sleep(delay) - return - - lines = text.split('\n') - self.printBuffer = '' - - for l in lines: - # Replace every empty line with break matching newline height - if len(l) == 0: - self.printBreak(30) - time.sleep(delay) - else: - # Split to lines - parts = [l[i:i+self.getRowCharacters()] for i in range(0, len(l), self.getRowCharacters())] - - for i, p in enumerate(parts): - self.tellPrinter(bytes(p, 'ascii')) - if i != 0: - time.sleep(delay) - - # Push last line from the buffer - self.tellPrinter(bytes('\n', 'ascii')) - time.sleep(delay) - - def printASCII(self, text='\n', delay=0.25): - """ - Write raw ASCII string to the printer. - By default this printer accepts an ascii string for printing it with raw monospace - font. Printer has internal buffer (getRowCharacters()) that will - accumulate the received characters. Printer will print out the buffer if meets a '\n' - character or buffer overflows. - This function expects only ASCII characters without control codes (0x00-0x20, 0xFF). - This function is not recommended to use while printer is in byte stream printing mode - or while it expects arguments for some of it's opcodes. - If string contains sequently repeating '\n' characters, they will be replaced with - printBreak(30) which matches the length of the '\n\n'. This function automatically - slices string into pieces of size getRowCharacters() and waits till new piece being - printed. - This function uses in class buffer to store tail of the text if text didn't end with - '\n'. - - :param text: string containing ASCII characters - :type text: str - :param delay: delay between sending each line - :type delay: float - """ - - # Remove non-ASCII & control (except \n) - text = ''.join([i for i in text if (31 < ord(i) or ord(i) == 10) and ord(i) < 127]) - - # Check for empty and print out newline - text = self.printBuffer + text - self.printBuffer = '' - if len(text) == 0: - return - - endLineBreak = text[-1] == '\n' - - # Remove last '\n' to avoid it's duplication - if len(text) > 0 and text[-1] == '\n': - if len(text) == 1: - self.printBreak(30) - time.sleep(delay) - return - - text = text[:-1] - - lines = text.split('\n') - - for i, l in enumerate(lines): - # Replace every empty line with break matching newline height - if len(l) == 0: - self.printBreak(30) - time.sleep(delay) - else: - # Split to lines - parts = [l[i:i+self.getRowCharacters()] for i in range(0, len(l), self.getRowCharacters())] - - for j, p in enumerate(parts): - # If this is the last part of the text and it ends with '\n', push it - if j == len(parts)-1: - if i == len(lines)-1: - if endLineBreak: - self.tellPrinter(bytes(p, 'ascii')) - time.sleep(delay) - self.tellPrinter(bytes('\n', 'ascii')) - time.sleep(delay) - else: - self.printBuffer = p - - # Push out the string that is a full row - if len(p) == self.getRowCharacters(): - self.tellPrinter(bytes(p, 'ascii')) - time.sleep(delay) - self.tellPrinter(bytes('\n', 'ascii')) - time.sleep(delay) - self.printBuffer = '' - else: - self.tellPrinter(bytes(p, 'ascii')) - time.sleep(delay) - self.tellPrinter(bytes('\n', 'ascii')) - time.sleep(delay) - else: - self.tellPrinter(bytes(p, 'ascii')) - if j != 0: - time.sleep(delay) - - def flushASCII(self, delay=0.25): - """ - Prints out the buffer used in printASCII() followed by newline. - - :param delay: delay between sending each line - :type delay: float - """ - - if len(self.printBuffer) > 0: - self.tellPrinter(bytes(self.printBuffer, 'ascii')) - time.sleep(delay) - self.tellPrinter(bytes('\n', 'ascii')) - time.sleep(delay) - self.printBuffer = '' - - def printRow(self, rowbytes): - """ - Send array of pixels represented with rowbytes bytes to the printer. - This operation invokes printer image / byte stream printing mode and prints out a single - row. - rowbytes expected to be bytes type with size matching the printer extected row size for - specified printer model (Refer to getRowBytes() for more information). - If amount of bytes exceeeds or under the required by this printer type, bytes array will - be cut or pad with zeros. - - :param rowbytes: bytes array of size getRowBytes() representing a single row - :type rowbytes: bytes - """ - - expectedLen = self.getRowBytes() - if len(rowbytes) < expectedLen: - rowbytes = rowbytes.ljust(expectedLen, bytes.fromhex('00')) - elif len(rowbytes) > expectedLen: - rowbytes = rowbytes[:expectedLen] - - self.reset() - - # Notify printer about incomming $expectedLen bytes row - if self.printerType == PrinterType.A6: - self.tellPrinter(bytes.fromhex('1d76300030000100')) - else: - self.tellPrinter(bytes.fromhex('1d76300048000100')) - - self.tellPrinter(rowbytes) - - # We're done here - - def printImageRowBytesList(self, imagebytes, delay=0.01): - """ - Performs printing of the Image bytes. Image width expected to match getRowBytes(), in other - case it will be cut or pad with bytes. - Input image is being split into multiple pieces if height exceeds 0xff pixels. This should - be done because of the limitation in 0xffff pixels in height for single page, but i limit - by 0xff. - imagebytes defines the list with rows. Each row is defined by bytes. - For example: [0xff000000, 0x00ff0000, 0x0000ff00, 0x000000ff] - - :param imagebytes: array of bytes containing rows of the image - :type imagebytes: list - :param delay: delay between sending each row of the image - :type delay: float - """ - - imgHeight = len(imagebytes) - expectedLen = self.getRowBytes() - nPieces = math.ceil(imgHeight / 0xff) - restPixels = imgHeight % 0xff - - for i in range(nPieces): - # Size of each print is 0xff, but last print has size restPixels - height = 0xff if i < nPieces-1 else restPixels - heightHex = 'ff' if i < nPieces-1 else '{0:0{1}X}'.format(restPixels, 2) - - self.reset() - - # Notify printer about incomming $expectedLen bytes row - if self.printerType == PrinterType.A6: - self.tellPrinter(bytes.fromhex(f'1d7630003000{heightHex}00')) - else: - self.tellPrinter(bytes.fromhex(f'1d7630004800{heightHex}00')) - - for j in range(height): - rowbytes = imagebytes[i*0xff+j] - - if len(rowbytes) < expectedLen: - rowbytes = rowbytes.ljust(expectedLen, bytes.fromhex('00')) - elif len(rowbytes) > expectedLen: - rowbytes = rowbytes[:expectedLen] - - self.tellPrinter(rowbytes) - - time.sleep(delay) - - def printImageBytes(self, imagebytes, delay=0.01): - """ - Performs printing of the Image bytes. Image width expected to match getRowBytes(), in other - case it will shift while printing and the last for not matching length of getRowBytes() - will be cut. - Input image is being split into multiple pieces if height exceeds 0xff pixels. This should - be done because of the limitation in 0xffff pixels in height for single page, but i limit - by 0xff. - imagebytes defines the entime image despite to printImageBytesList argument - For example: [0xff000000, 0x00ff0000, 0x0000ff00, 0x000000ff] will be defined as - 0xff00000000ff00000000ff00000000ff. - - :param imagebytes: bytes containing rows of the image - :type imagebytes: bytes - :param delay: delay between sending each row of the image - :type delay: float - """ - - expectedLen = self.getRowBytes() - imgHeight = math.floor(len(imagebytes) / expectedLen) - nPieces = math.ceil(imgHeight / 0xff) - restPixels = imgHeight % 0xff - - for i in range(nPieces): - self.reset() - - # Size of each print is 0xff, because last part is cut off - height = 0xff if i < nPieces-1 else restPixels - heightHex = 'ff' if i < nPieces-1 else '{0:0{1}X}'.format(restPixels, 2) - - # Notify printer about incomming $expectedLen bytes row - if self.printerType == PrinterType.A6: - self.tellPrinter(bytes.fromhex(f'1d7630003000{heightHex}00')) - else: - self.tellPrinter(bytes.fromhex(f'1d7630004800{heightHex}00')) - - for j in range(height): - self.tellPrinter(imagebytes[(i*0xff+j)*expectedLen:(i*0xff+(j+1))*expectedLen]) - - time.sleep(delay) - - def printImage(self, img, delay=0.01, resample=Image.NEAREST): - """ - Performs printing of PIL image. Image is being rescaled to match width of getRowPixels(). - Result image is converted to '1' binary mode. If image width exceeds limit of 0xff pixels - in heigth, it will be split into multiple parts. - - :param img: image to print - :type img: Image - :param delay: delay between sending each row of the image - :type delay: float - :param resample: image resampling mode (Image.NEAREST, Image.BILINEAR, Image.BICUBIC, Image.ANTIALIAS) - """ - - img = img.convert('L') - img = ImageOps.invert(img) - img = img.resize((self.getRowWidth(), int(self.getRowWidth() / img.size[0] * img.size[1])), resample) - img = img.convert('1') - - imgbytes = img.tobytes() - self.printImageBytes(imgbytes) - - def printQR(self, text, delay=0.01, resample=Image.NEAREST): - """ - Generates QR code from specified string and prints it out. - - :param text: Text for qr code - :tapy text: str - :param delay: delay between sending each row of the image - :type delay: float - :param resample: image resampling mode (Image.NEAREST, Image.BILINEAR, Image.BICUBIC, Image.ANTIALIAS) - """ - - self.printImage(qrcode.make(text, border=0), delay, resample) - - def printRowBytesIterator(self, rowiterator, delay=0.25): - """ - Allows printing image using iterator / generator that should return bytes - of row as result. If amount of returned bytes do not match getRowBytes(), - they will be pad or cut. - - :param rowiterator: iterator or generator returning bytes describing rows of the image. - :param delay: delay between sending each row - :type delay: float - """ - - for r in rowiterator: - self.printRow(r) - time.sleep(delay) - - - def printRowIterator(self, rowiterator, delay=0.25): - """ - Allows printing image using iterator / generator that should return image size of - (getRowWidth(), 1) as result. Returned image is geing resampled and printed as a row. - This function does not perform check if this image is really (getRowWidth(), 1) pixels - size and just passes it to the printImage() repeatly - - :param rowiterator: iterator or generator returning image for each row. - :param delay: delay between sending each row - :type delay: float - """ - - for r in rowiterator: - self.printImage(r) - time.sleep(delay) - - def printRowBytesIteratorOfSize(self, rowiterator, rowcount, delay=0.01): - """ - Allows printing image using iterator / generator that should return bytes - of row as result. If amount of returned bytes do not match getRowBytes(), - they will be pad or cut. - Additional parameter for this function is amount of rows that should be - reserved for printing. This is usefull for printing long procedural - generated pages when printing using single row commit is too slow. - - :param rowiterator: iterator or generator returning bytes describing rows of the image. - :param rowcount: amount of rows to reserve for printing in range (0x1, 0xffff) - :type rowcount: int - :param delay: delay between sending each row - :type delay: float - """ - - rowcount = min(0xffff, max(0x1, rowcount)) - rowcountstr = '{0:0{1}X}'.format(rowcount, 4) - rowcountstr = rowcountstr[2:4] + rowcountstr[0:2] - - self.reset() - - # Notify printer about incomming bytes - if self.printerType == PrinterType.A6: - self.tellPrinter(bytes.fromhex(f'1d7630003000{rowcountstr}')) - elif self.printerType == PrinterType.A6p: - self.tellPrinter(bytes.fromhex(f'1d7630004800{rowcountstr}')) - else: - raise ValueError('Unsupported printer type') - - expectedLen = self.getRowBytes() - - for i, r in enumerate(rowiterator): - rowbytes = r - - if len(rowbytes) < expectedLen: - rowbytes = rowbytes.ljust(expectedLen, bytes.fromhex('00')) - elif len(rowbytes) > expectedLen: - rowbytes = rowbytes[:expectedLen] - - self.tellPrinter(rowbytes) - - time.sleep(delay) - - if i == rowcount - 1: - break diff --git a/ppa6/__main__.py b/ppa6/__main__.py deleted file mode 100644 index d0f0053..0000000 --- a/ppa6/__main__.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python3 - -# MIT License -# -# Copyright (c) 2021 bitrate16 - -def main(): - import argparse - import sys - from ppa6 import Printer, PrinterType - from PIL import Image - - parser = argparse.ArgumentParser(description='Print on a Peripage A6 / A6+ via bluetooth') - parser.add_argument('-m', '--mac', help='Bluetooth MAC address of the printer', required=True, type=str) - parser.add_argument('-c', '--concentration', help='Concentration value for printing (0, 1, 2)', required=False, choices=[0, 1, 2], metavar='[0-2]', type=int, default=0) - parser.add_argument('-b', '--break', dest='breaksize', help='Size of the break that should be inserted after the print (max 255)', required=False, choices=range(256), metavar='[0-255]', type=int, default=0) - parser.add_argument('-p', '--printer', help='Printer model name (A6 or A6+/A6p (both allowed))', required=False, choices=['A6', 'A6p', 'A6+'], type=str, default='A6') - parser.add_argument('-n', '--newline', help='Force printer to add newline at the end of the printed text and flush the buffer', required=False, action='store_true') - - # Selection of the required action: - # 1. Print text - # 2. Print stream from stdin - # 3. Print image from file - # 4. Requist printer information - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-t', '--text', help='ASCII text that should be printed. Add a line break at the end of the string to avoid it being cut. String can be empty, so just page break will be printed', type=str) - group.add_argument('-s', '--stream', help='Reads an input from stdin and prints as ASCII text', action='store_true') - group.add_argument('-i', '--image', help='Path to the image that should be printed', type=str) - group.add_argument('-q', '--qr', help='String for QR code print', type=str) - group.add_argument('-e', '--introduce', help='Ask the printer to introduce himself', action='store_true') - - args = parser.parse_args() - - # Open connection - - printer = Printer(args.mac, PrinterType.A6 if args.printer == 'A6' else PrinterType.A6p) - - #try: - printer.connect() - #except: - # print('Failed to open connection') - # sys.exit(0) - - if 'introduce' in args and args.introduce: - - # print('Hello, my name is Harold..') - print(printer.getDeviceFull().decode('ascii')) - printer.disconnect() - sys.exit(0) - - elif 'stream' in args and args.stream: - - if 'concentration' in args: - printer.setConcentration(args.concentration) - - printer.reset() - - while True: - try: - line = input() - - printer.printlnASCII(line) - - except EOFError: - # Input closed ^d^d - break - - if 'breaksize' in args and args.breaksize > 0: - printer.printBreak(args.breaksize) - - sys.exit(0) - - elif 'text' in args and args.text is not None: - - if 'concentration' in args: - printer.setConcentration(args.concentration) - - printer.reset() - - line = args.text - if args.newline: - line += '\n' - - if len(line) > 0: - printer.printASCII(line) - - if 'breaksize' in args and args.breaksize > 0: - printer.printBreak(args.breaksize) - - sys.exit(0) - - elif 'image' in args and args.image is not None: - - if 'concentration' in args: - printer.setConcentration(args.concentration) - - printer.reset() - img = None - - try: - img = Image.open(args.image) - except: - print(f'Failed to open image {args.image}') - - printer.printImage(img, resample=Image.ANTIALIAS) - - if 'breaksize' in args and args.breaksize > 0: - printer.printBreak(args.breaksize) - - sys.exit(0) - - elif 'qr' in args and args.qr is not None: - - if 'concentration' in args: - printer.setConcentration(args.concentration) - - printer.reset() - - printer.printQR(args.qr, resample=Image.ANTIALIAS) - - if 'breaksize' in args and args.breaksize > 0: - printer.printBreak(args.breaksize) - - sys.exit(0) - - else: - - print('How did you get there?') - -if __name__ == '__main__': - main() diff --git a/print-server/README.md b/print-server/README.md index a064c50..b9e34c5 100644 --- a/print-server/README.md +++ b/print-server/README.md @@ -7,7 +7,7 @@ Simple utility for creating standalone print server for peripage A6/A6+ thermal Edit config in `__main__.py` ```python # Config -PRINTER_MODEL = ppa6.PrinterType.A6p +PRINTER_MODEL = peripage.PrinterType.A6p PRINTER_MAC = '00:15:83:15:bc:5f' SERVER_PORT = 11001 BREAK_SIZE = 100 @@ -49,7 +49,7 @@ $ sudo apt install bluetooth bluez libbluetooth-dev libopenjp2-7 Packages: ``` -$ pip3 install Pillow aiohttp aiohttp_middlewares ppa6 python-dateutil +$ pip3 install Pillow aiohttp aiohttp_middlewares peripage python-dateutil ``` # Usage diff --git a/print-server/__main__.py b/print-server/__main__.py index bcbba57..823d082 100644 --- a/print-server/__main__.py +++ b/print-server/__main__.py @@ -2,7 +2,7 @@ # Requirements: # $ sudo apt install bluetooth bluez libbluetooth-dev libopenjp2-7 -# $ pip3 install Pillow aiohttp aiohttp_middlewares ppa6 python-dateutil +# $ pip3 install Pillow aiohttp aiohttp_middlewares peripage python-dateutil # Run with # $ python3 -m print-server @@ -13,11 +13,10 @@ import os import aiohttp.web import aiohttp_middlewares -import ppa6 +import peripage import atexit import PIL import sys -import time from dateutil import tz from datetime import datetime @@ -25,7 +24,7 @@ from . import print_service # Config -PRINTER_MODEL = ppa6.PrinterType.A6p +PRINTER_MODEL = peripage.PrinterType.A6p PRINTER_MAC = '00:15:83:15:bc:5f' SERVER_PORT = 11001 BREAK_SIZE = 100 @@ -38,207 +37,211 @@ # Globals service: print_service.PrintService = None app: aiohttp.web.Application = None +file = None # Utils def log(*args): - print(*args) - for a in args: - file.write(str(a)) - file.write(' ') - file.write('\n') - file.flush() + print(*args) + for a in args: + file.write(str(a)) + file.write(' ') + file.write('\n') + file.flush() def print_break(timestamp, date, ip, proxy_ip): - """ - Simple page break of given size - """ - - def wrap_print_break(p: ppa6.Printer): - p.printBreak(BREAK_SIZE) - log(ip, '/', proxy_ip, '#', date, timestamp, 'done', 'BREAK') - - service.add_print_handler(wrap_print_break) + """ + Simple page break of given size + """ + + def wrap_print_break(p: peripage.Printer): + p.printBreak(BREAK_SIZE) + log(ip, '/', proxy_ip, '#', date, timestamp, 'done', 'BREAK') + + service.add_print_handler(wrap_print_break) # Handlers async def post_print_ascii(request: aiohttp.web.Request): - - if request.query.get('secret', None) != SECRET_KEY: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'missing secret key' - }) - - # Get request payload - if not request.body_exists: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'missing request body' - }) - - # Clear, post and return length - text = await request.text() - - # Additinally process string - text.replace('\t', ' ') - ascii_text = ''.join([i for i in text if (31 < ord(i) or ord(i) == 10) and ord(i) < 127]).strip() - - if len(ascii_text) == 0: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'empty ascii string' - }) - - date = datetime.now(tz.gettz(TIMEZONE)) - timestamp = round(date.timestamp() * 1000) - date = date.strftime("%d.%m.%Y %H:%M:%S.%f") - - # Log - log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, '--->', 'ASCII') - - # Save data - if RECEIVE_DIRECTORY is not None: - with open(f'{RECEIVE_DIRECTORY}/{timestamp}_ascii.txt', 'w') as f: - f.write(ascii_text) - - # Decorate string - print_text = ascii_text - if (request.query.get('print_date', None) == 'true') or (request.query.get('print_date', None) == '1'): - print_text = f'{date}\n{print_text}' - - # Get concentration - try: - concenttration = min(2, max(0, int(request.query.get('print_concentration', 0)))) - except: - concenttration = 0 - - # Submit image printing task - def wrap_print_ascii(p: ppa6.Printer): - p.setConcentration(concenttration) - p.printASCII(ascii_text) - p.flushASCII() - log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, 'done', 'ASCII') - - service.add_print_handler(wrap_print_ascii) - - if (request.query.get('print_break', None) == 'true') or (request.query.get('print_break', None) == '1'): - print_break(timestamp, date, request.remote, request.headers.get('X-Forwarded-For', 'None')) - - return aiohttp.web.json_response({ - 'status': 'result', - 'length': len(ascii_text) - }) + + if request.query.get('secret', None) != SECRET_KEY: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'missing secret key' + }) + + # Get request payload + if not request.body_exists: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'missing request body' + }) + + # Clear, post and return length + text = await request.text() + + # Additinally process string + text.replace('\t', ' ') + ascii_text = ''.join([i for i in text if (31 < ord(i) or ord(i) == 10) and ord(i) < 127]).strip() + + if len(ascii_text) == 0: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'empty ascii string' + }) + + date = datetime.now(tz.gettz(TIMEZONE)) + timestamp = round(date.timestamp() * 1000) + date = date.strftime("%d.%m.%Y %H:%M:%S.%f") + + # Log + log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, '--->', 'ASCII') + + # Save data + if RECEIVE_DIRECTORY is not None: + with open(f'{RECEIVE_DIRECTORY}/{timestamp}_ascii.txt', 'w') as f: + f.write(ascii_text) + + # Decorate string + print_text = ascii_text + if (request.query.get('print_date', None) == 'true') or (request.query.get('print_date', None) == '1'): + print_text = f'{date}\n{print_text}' + + # Get concentration + try: + concenttration = min(2, max(0, int(request.query.get('print_concentration', 0)))) + except: + concenttration = 0 + + # Submit image printing task + def wrap_print_ascii(p: peripage.Printer): + p.setConcentration(concenttration) + p.printASCII(ascii_text) + p.flushASCII() + log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, 'done', 'ASCII') + + service.add_print_handler(wrap_print_ascii) + + if (request.query.get('print_break', None) == 'true') or (request.query.get('print_break', None) == '1'): + print_break(timestamp, date, request.remote, request.headers.get('X-Forwarded-For', 'None')) + + return aiohttp.web.json_response({ + 'status': 'result', + 'length': len(ascii_text) + }) async def post_print_image(request: aiohttp.web.Request): - - if request.query.get('secret', None) != SECRET_KEY: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'missing secret key' - }) - - # Get request payload - if not request.body_exists: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'missing request body' - }) - - post = post = await request.post() - image = post.get('image') - - if not image: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'missing request image' - }) - - try: - img_content = image.file.read() - buf = io.BytesIO(img_content) - img = PIL.Image.open(buf) - - date = datetime.now(tz.gettz(TIMEZONE)) - timestamp = round(date.timestamp() * 1000) - date = date.strftime("%d.%m.%Y %H:%M:%S.%f") - - if not img: - return aiohttp.web.json_response({ - 'status': 'error', - 'message': 'invalid request image' - }) - - # Log - log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, '--->', 'Image') - - # Save data - if RECEIVE_DIRECTORY is not None: - img.save(f'{RECEIVE_DIRECTORY}/{timestamp}_image.png', 'PNG') - - # Get concentration - try: - concenttration = min(2, max(0, int(request.query.get('print_concentration', 0)))) - except: - concenttration = 0 - - # Submit image printing task - def wrap_print_image(p: ppa6.Printer): - p.setConcentration(concenttration) - p.printImage(img) - log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, 'done', 'Image') - - service.add_print_handler(wrap_print_image) - - # Add page break - if (request.query.get('print_break', None) == 'true') or (request.query.get('print_break', None) == '1'): - print_break(timestamp, date, request.remote, request.headers.get('X-Forwarded-For', 'None')) - - # Return size of payload - return aiohttp.web.json_response({ - 'status': 'result', - 'length': len(img_content) - }) - - except: - type, value, _ = sys.exc_info() - return aiohttp.web.json_response({ - 'status': 'error', - 'message': str(value), - 'type': str(type) - }) + + if request.query.get('secret', None) != SECRET_KEY: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'missing secret key' + }) + + # Get request payload + if not request.body_exists: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'missing request body' + }) + + post = post = await request.post() + image = post.get('image') + + if not image: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'missing request image' + }) + + try: + img_content = image.file.read() + buf = io.BytesIO(img_content) + img = PIL.Image.open(buf) + + date = datetime.now(tz.gettz(TIMEZONE)) + timestamp = round(date.timestamp() * 1000) + date = date.strftime("%d.%m.%Y %H:%M:%S.%f") + + if not img: + return aiohttp.web.json_response({ + 'status': 'error', + 'message': 'invalid request image' + }) + + # Log + log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, '--->', 'Image') + + # Save data + if RECEIVE_DIRECTORY is not None: + img.save(f'{RECEIVE_DIRECTORY}/{timestamp}_image.png', 'PNG') + + # Get concentration + try: + concenttration = min(2, max(0, int(request.query.get('print_concentration', 0)))) + except: + concenttration = 0 + + # Submit image printing task + def wrap_print_image(p: peripage.Printer): + p.setConcentration(concenttration) + p.printImage(img) + log(request.remote, '/', request.headers.get('X-Forwarded-For', 'None'), '#', date, timestamp, 'done', 'Image') + + service.add_print_handler(wrap_print_image) + + # Add page break + if (request.query.get('print_break', None) == 'true') or (request.query.get('print_break', None) == '1'): + print_break(timestamp, date, request.remote, request.headers.get('X-Forwarded-For', 'None')) + + # Return size of payload + return aiohttp.web.json_response({ + 'status': 'result', + 'length': len(img_content) + }) + + except: + type, value, _ = sys.exc_info() + return aiohttp.web.json_response({ + 'status': 'error', + 'message': str(value), + 'type': str(type) + }) def main(): - - # Create output directory - if RECEIVE_DIRECTORY is not None: - os.makedirs(RECEIVE_DIRECTORY, exist_ok=True) - - # Init app - global app - app = aiohttp.web.Application(middlewares=[ - aiohttp_middlewares.cors_middleware(allow_all=True), - ], client_max_size=MAX_FILE_SIZE) - - # Init printing service - global service - service = print_service.PrintService(60, 1, 5) - service.start(PRINTER_MAC, PRINTER_MODEL) - - # Attach routes - app.router.add_post('/print_ascii', post_print_ascii) - app.router.add_post('/print_image', post_print_image) - - # Register exit handler - atexit.register(dispose) - - # Run - aiohttp.web.run_app(app, port=SERVER_PORT) + + # Create output directory + if RECEIVE_DIRECTORY is not None: + os.makedirs(RECEIVE_DIRECTORY, exist_ok=True) + + global file + file = open(f'print-server.log', 'a', encoding='utf-8') + + # Init app + global app + app = aiohttp.web.Application(middlewares=[ + aiohttp_middlewares.cors_middleware(allow_all=True), + ], client_max_size=MAX_FILE_SIZE) + + # Init printing service + global service + service = print_service.PrintService(60, 1, 5) + service.start(PRINTER_MAC, PRINTER_MODEL) + + # Attach routes + app.router.add_post('/print_ascii', post_print_ascii) + app.router.add_post('/print_image', post_print_image) + + # Register exit handler + atexit.register(dispose) + + # Run + aiohttp.web.run_app(app, port=SERVER_PORT) def dispose(): - service.stop() + service.stop() if __name__ == '__main__': - main() + main() diff --git a/print-server/print_service.py b/print-server/print_service.py index 0153d4b..dc0593d 100644 --- a/print-server/print_service.py +++ b/print-server/print_service.py @@ -1,34 +1,35 @@ # Utility for tracking printing tasks # # MIT License -# +# # Copyright (c) 2022 bitrate16 import time import threading +import PIL -import ppa6 +import peripage class Repeat(): """ Interval-based code execution """ - + def __init__(self, interval: float, handler): self.interval = interval self.running = False self.should_stop = True self.thread = None self.handler = handler - + def start(self): if self.running: return False else: def handler(): self.running = True - + while not self.should_stop: try: self.handler() @@ -36,42 +37,42 @@ def handler(): # XXX: Important: we are ignoring this exception pass time.sleep(self.interval) - + self.running = False - + self.should_stop = False - self.thread = threading.Thread(target=handler, daemon=True).start() + self.thread = threading.Thread(target=handler).start() return True - + def stop(self): if not self.running: return False else: self.should_stop = True - + def set_handler(self, handler): self.handler = handler - + def is_running(self): return self.running class PrintService: """ - This printer task autimatically handler print tasks from internal queue and + This printer task autimatically handler print tasks from internal queue and maintains printer connected state. - - Printer processes events in another thread by proocessing single event per + + Printer processes events in another thread by proocessing single event per event_interval. - - If printer disconnects, this service will automatically reconnect it after - event_interval and print in the same time slot. If reconnect attempts fail, + + If printer disconnects, this service will automatically reconnect it after + event_interval and print in the same time slot. If reconnect attempts fail, it will wait given offline_interval until next connection attempt. - - If ping interval is over, printer will be pinged to return battery level and + + If ping interval is over, printer will be pinged to return battery level and keep connection alive (prevent sleep). """ - + def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline_interval: float = 1, startup_interval: float = 1, guard_ping_interval: float = 1): # Printer keep-alive check interval, seconds self.ping_interval = ping_interval @@ -81,13 +82,13 @@ def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline # Last printer ping timestamp, seconds self.last_ping_timestamp = 0 - + # Interval to wait after printer cconnection established self.startup_interval = startup_interval - + # Time between reconnect attempts self.offline_interval = offline_interval - + # interval between ping and data sending self.guard_ping_interval = guard_ping_interval @@ -95,15 +96,15 @@ def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline self.service: Repeat = None # Instance of printer - self.printer: ppa6.Printer = None + self.printer: peripage.Printer = None # Event queue self.events = [] - + # Indicate service failture self.service_failture = True - - def start(self, printer_mac: str, printer_type: ppa6.PrinterType, timeout: float = 1.0, concentration: int = 1): + + def start(self, printer_mac: str, printer_type: peripage.PrinterType, timeout: float = 1.0, concentration: int = 1): """ Perform startup oof the service without check for previous instance running. """ @@ -112,138 +113,256 @@ def service_handler(): """ Internal event processing handler. """ - + initial_failture = True while self.service.is_running(): try: if not self.printer.isConnected(): raise RuntimeError('not connected') - + # Windows workaround try: self.printer.sock.listen() except: pass - + if not self.printer.isConnected(): raise RuntimeError('not connected') - + # If time is over, perform keep-alive procedure if time.time() > (self.last_ping_timestamp + self.ping_interval): str(self.printer.getDeviceBattery()) self.last_ping_timestamp = time.time() - + # Execute task handler # Task will be deleted only after correct execution without exceptions if len(self.events): - + if self.guard_ping_interval is not None: str(self.printer.getDeviceBattery()) self.last_ping_timestamp = time.time() - + time.sleep(self.guard_ping_interval) - + self.events[0](self.printer) self.events.pop(0) - + # Return on success return - + except: # Connection error, reinitialize connection self.service_failture = True - + # Wait for offline_interval before reconnects if not initial_failture: time.sleep(self.offline_interval) initial_failture = False - + # Disconnect try: if self.printer.isConnected(): self.printer.disconnect() except: pass - + # Connect try: self.printer.connect() self.printer.reset() self.printer.setConcentration(self.concentration) - + time.sleep(self.startup_interval) - + self.last_ping_timestamp = time.time() self.service_failture = False except: pass - + self.concentration = concentration - self.printer = ppa6.Printer(printer_mac, printer_type, timeout) + self.printer = peripage.Printer(printer_mac, printer_type, timeout) self.last_ping_timestamp = time.time() self.events = [] self.service = Repeat(self.event_interval, service_handler) self.service.start() - + def stop(self): try: self.service.stop() self.printer.disconnect() except: pass - + def is_service_failture(self): return self.service_failture - + def add_print_handler(self, print_handler): """ - Adds event handler to the queue. THis handler will be executed with single + Adds event handler to the queue. THis handler will be executed with single arguemnt - printer instance. - + Example: ``` printer_task.add_print_handler(lambda printer: printer.printASCII('hello')) ``` """ - + try: self.events.append(print_handler) return True except: return False - def add_print_ascii(self, ascii_text: str, flush: bool = False): + def add_print_ascii(self, ascii_text: str, concentration: int=None, break_size: int=0, /, flush: bool = False): """ - Adds simple print event to queue, additionally flushes output buffer. - + Adds simple print ASCII event to queue, additionally flushes output + buffer. + + `ascii_text` defines the input text to be printed. + + `concentration` defines the concentration value from range [0, 1, 2]. + Set to None to ignore. + + `break_size` defines the break size to print after the text. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to + ignore. + + `flush` allows force flushing ASCII buffer. Refers to + `peripage.Printer.flushASCII()`. + + Example: ``` - printer_task.add_print_ascii('hello', True) + printer_task.add_print_ascii('hello', concentration=2, break_size=100, flush=True) ``` """ - - def wrap_print(printer: ppa6.Printer): + + def wrap_print(printer: peripage.Printer): + if concentration is not None: + printer.setConcentration(concentration) + printer.printASCII(ascii_text) + if flush: printer.flushASCII() - + + if break_size is not None and break_size > 0: + printer.printBreak(break_size) + try: self.events.append(wrap_print) return True except: return False - + + def add_print_image(self, image: PIL.Image, concentration: int=None, break_size: int=0): + """ + Adds simple print Image event to queue. + + `image` defines the input image to be printed. + + `concentration` defines the concentration value from range [0, 1, 2]. + Set to None to ignore. + + `break_size` defines the break size to print after the image. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to + ignore. + + + Example: + ``` + printer_task.add_print_iamge(PIL.open('image.png'), concentration=2, break_size=100, flush=True) + ``` + """ + + def wrap_print(printer: peripage.Printer): + if concentration is not None: + printer.setConcentration(concentration) + + printer.printImage(image) + + if break_size is not None and break_size > 0: + printer.printBreak(break_size) + + try: + self.events.append(wrap_print) + return True + except: + return False + + def add_print_break(self, break_size: int=0): + """ + Adds simple print break event to queue. + + `break_size` defines the break size to print after the image. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to + ignore. + + + Example: + ``` + printer_task.add_print_break(200) + ``` + """ + + if break_size is not None and break_size > 0: + try: + self.events.append(lambda p: p.printBreak(break_size)) + return True + except: + return False + return False + + def add_print_flush_ascii(self): + """ + Adds simple flush ASCII buffer event to queue. + + + Example: + ``` + printer_task.add_print_flush_ascii(200) + ``` + """ + + try: + self.events.append(lambda p: p.flushASCII()) + return True + except: + return False + + def add_print_concentration(self, concentration: int=None): + """ + Adds concentration change event to queue. + + `concentration` defines the concentration value from range [0, 1, 2]. + Set to None to ignore. + + + Example: + ``` + printer_task.add_print_concentration(1) + ``` + """ + + if concentration is not None: + try: + self.events.append(lambda p: p.setConcentration(concentration)) + return True + except: + return False + return False + def clear_tasks(self): """ Remove all tasks from queue """ - + if self.events: self.events.clear() - + def get_task_count(self): """ Returns rest task count """ - - return len(self.events) if self.events else 0 + + return len(self.events) if self.events else 0 diff --git a/print-server/scripts/print_ascii_clipboard.py b/print-server/scripts/print_ascii_clipboard.py index eb2d495..5d86e61 100644 --- a/print-server/scripts/print_ascii_clipboard.py +++ b/print-server/scripts/print_ascii_clipboard.py @@ -2,7 +2,7 @@ import requests import sys -SERVER_ADDR = 'http://192.168.1.101:11001' +SERVER_ADDR = 'http://127.0.0.1:11001' BREAK = 1 # Enable/disable CONCENTRATION = 2 # Value (0-2) SECRET_KEY = '1234567890' @@ -12,7 +12,7 @@ print(s) r = requests.post( - url=f'{SERVER_ADDR}/print_ascii?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', + url=f'{SERVER_ADDR}/print_ascii?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', data=s ) diff --git a/print-server/scripts/print_image_clipboard.py b/print-server/scripts/print_image_clipboard.py index 92487d0..181de4d 100644 --- a/print-server/scripts/print_image_clipboard.py +++ b/print-server/scripts/print_image_clipboard.py @@ -6,7 +6,7 @@ import traceback import os -SERVER_ADDR = 'http://192.168.1.101:11001' +SERVER_ADDR = 'http://127.0.0.1:11001' BREAK = 1 # Enable/disable CONCENTRATION = 2 # Value (0-2) SECRET_KEY = '1234567890' @@ -17,18 +17,18 @@ if isinstance(im, list): im = im[0] - + if not (im.endswith('.png') or im.endswith('.jpg') or im.endswith('.jpeg')): raise RuntimeError('Invalid file type') - + try: r = requests.post( - url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', + url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', files={ 'image': open(im, 'rb') } ) - + print(r.status_code, r.text) except: traceback.print_exc() @@ -36,15 +36,15 @@ else: temp_name = f'{secrets.token_bytes(16).hex()}-temp.png' im.save(temp_name, 'PNG') - + try: r = requests.post( - url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', + url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', files={ 'image': open(temp_name, 'rb') } ) - + print(r.status_code, r.text) except: traceback.print_exc() diff --git a/print-server/scripts/print_image_drag_and_drop.py b/print-server/scripts/print_image_drag_and_drop.py index 7531006..565203b 100644 --- a/print-server/scripts/print_image_drag_and_drop.py +++ b/print-server/scripts/print_image_drag_and_drop.py @@ -1,13 +1,13 @@ import requests import sys -SERVER_ADDR = 'http://192.168.1.101:11001' +SERVER_ADDR = 'http://127.0.0.1:11001' BREAK = 1 # Enable/disable CONCENTRATION = 2 # Value (0-2) SECRET_KEY = '1234567890' r = requests.post( - url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', + url=f'{SERVER_ADDR}/print_image?print_break={BREAK}&print_concentration={CONCENTRATION}&secret={SECRET_KEY}', files={ 'image': open(sys.argv[1], 'rb') } diff --git a/print_service.py b/print_service.py index 2b46e01..dc0593d 100644 --- a/print_service.py +++ b/print_service.py @@ -1,35 +1,35 @@ # Utility for tracking printing tasks # # MIT License -# +# # Copyright (c) 2022 bitrate16 import time import threading import PIL -import ppa6 +import peripage class Repeat(): """ Interval-based code execution """ - + def __init__(self, interval: float, handler): self.interval = interval self.running = False self.should_stop = True self.thread = None self.handler = handler - + def start(self): if self.running: return False else: def handler(): self.running = True - + while not self.should_stop: try: self.handler() @@ -37,42 +37,42 @@ def handler(): # XXX: Important: we are ignoring this exception pass time.sleep(self.interval) - + self.running = False - + self.should_stop = False self.thread = threading.Thread(target=handler).start() return True - + def stop(self): if not self.running: return False else: self.should_stop = True - + def set_handler(self, handler): self.handler = handler - + def is_running(self): return self.running class PrintService: """ - This printer task autimatically handler print tasks from internal queue and + This printer task autimatically handler print tasks from internal queue and maintains printer connected state. - - Printer processes events in another thread by proocessing single event per + + Printer processes events in another thread by proocessing single event per event_interval. - - If printer disconnects, this service will automatically reconnect it after - event_interval and print in the same time slot. If reconnect attempts fail, + + If printer disconnects, this service will automatically reconnect it after + event_interval and print in the same time slot. If reconnect attempts fail, it will wait given offline_interval until next connection attempt. - - If ping interval is over, printer will be pinged to return battery level and + + If ping interval is over, printer will be pinged to return battery level and keep connection alive (prevent sleep). """ - + def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline_interval: float = 1, startup_interval: float = 1, guard_ping_interval: float = 1): # Printer keep-alive check interval, seconds self.ping_interval = ping_interval @@ -82,13 +82,13 @@ def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline # Last printer ping timestamp, seconds self.last_ping_timestamp = 0 - + # Interval to wait after printer cconnection established self.startup_interval = startup_interval - + # Time between reconnect attempts self.offline_interval = offline_interval - + # interval between ping and data sending self.guard_ping_interval = guard_ping_interval @@ -96,15 +96,15 @@ def __init__(self, ping_interval: float = 60, event_interval: float = 1, offline self.service: Repeat = None # Instance of printer - self.printer: ppa6.Printer = None + self.printer: peripage.Printer = None # Event queue self.events = [] - + # Indicate service failture self.service_failture = True - - def start(self, printer_mac: str, printer_type: ppa6.PrinterType, timeout: float = 1.0, concentration: int = 1): + + def start(self, printer_mac: str, printer_type: peripage.PrinterType, timeout: float = 1.0, concentration: int = 1): """ Perform startup oof the service without check for previous instance running. """ @@ -113,100 +113,100 @@ def service_handler(): """ Internal event processing handler. """ - + initial_failture = True while self.service.is_running(): try: if not self.printer.isConnected(): raise RuntimeError('not connected') - + # Windows workaround try: self.printer.sock.listen() except: pass - + if not self.printer.isConnected(): raise RuntimeError('not connected') - + # If time is over, perform keep-alive procedure if time.time() > (self.last_ping_timestamp + self.ping_interval): str(self.printer.getDeviceBattery()) self.last_ping_timestamp = time.time() - + # Execute task handler # Task will be deleted only after correct execution without exceptions if len(self.events): - + if self.guard_ping_interval is not None: str(self.printer.getDeviceBattery()) self.last_ping_timestamp = time.time() - + time.sleep(self.guard_ping_interval) - + self.events[0](self.printer) self.events.pop(0) - + # Return on success return - + except: # Connection error, reinitialize connection self.service_failture = True - + # Wait for offline_interval before reconnects if not initial_failture: time.sleep(self.offline_interval) initial_failture = False - + # Disconnect try: if self.printer.isConnected(): self.printer.disconnect() except: pass - + # Connect try: self.printer.connect() self.printer.reset() self.printer.setConcentration(self.concentration) - + time.sleep(self.startup_interval) - + self.last_ping_timestamp = time.time() self.service_failture = False except: pass - + self.concentration = concentration - self.printer = ppa6.Printer(printer_mac, printer_type, timeout) + self.printer = peripage.Printer(printer_mac, printer_type, timeout) self.last_ping_timestamp = time.time() self.events = [] self.service = Repeat(self.event_interval, service_handler) self.service.start() - + def stop(self): try: self.service.stop() self.printer.disconnect() except: pass - + def is_service_failture(self): return self.service_failture - + def add_print_handler(self, print_handler): """ - Adds event handler to the queue. THis handler will be executed with single + Adds event handler to the queue. THis handler will be executed with single arguemnt - printer instance. - + Example: ``` printer_task.add_print_handler(lambda printer: printer.printASCII('hello')) ``` """ - + try: self.events.append(print_handler) return True @@ -215,96 +215,96 @@ def add_print_handler(self, print_handler): def add_print_ascii(self, ascii_text: str, concentration: int=None, break_size: int=0, /, flush: bool = False): """ - Adds simple print ASCII event to queue, additionally flushes output + Adds simple print ASCII event to queue, additionally flushes output buffer. - + `ascii_text` defines the input text to be printed. - - `concentration` defines the concentration value from range [0, 1, 2]. + + `concentration` defines the concentration value from range [0, 1, 2]. Set to None to ignore. - - `break_size` defines the break size to print after the text. Refers to - `ppa6.Printer.printBreak()` for value limitations. Set to None or 0 to + + `break_size` defines the break size to print after the text. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to ignore. - - `flush` allows force flushing ASCII buffer. Refers to - `ppa6.Printer.flushASCII()`. - - + + `flush` allows force flushing ASCII buffer. Refers to + `peripage.Printer.flushASCII()`. + + Example: ``` printer_task.add_print_ascii('hello', concentration=2, break_size=100, flush=True) ``` """ - - def wrap_print(printer: ppa6.Printer): + + def wrap_print(printer: peripage.Printer): if concentration is not None: printer.setConcentration(concentration) - + printer.printASCII(ascii_text) - + if flush: printer.flushASCII() - + if break_size is not None and break_size > 0: printer.printBreak(break_size) - + try: self.events.append(wrap_print) return True except: return False - + def add_print_image(self, image: PIL.Image, concentration: int=None, break_size: int=0): """ Adds simple print Image event to queue. - + `image` defines the input image to be printed. - - `concentration` defines the concentration value from range [0, 1, 2]. + + `concentration` defines the concentration value from range [0, 1, 2]. Set to None to ignore. - - `break_size` defines the break size to print after the image. Refers to - `ppa6.Printer.printBreak()` for value limitations. Set to None or 0 to + + `break_size` defines the break size to print after the image. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to ignore. - - + + Example: ``` printer_task.add_print_iamge(PIL.open('image.png'), concentration=2, break_size=100, flush=True) ``` """ - - def wrap_print(printer: ppa6.Printer): + + def wrap_print(printer: peripage.Printer): if concentration is not None: printer.setConcentration(concentration) - + printer.printImage(image) - + if break_size is not None and break_size > 0: printer.printBreak(break_size) - + try: self.events.append(wrap_print) return True except: return False - + def add_print_break(self, break_size: int=0): """ Adds simple print break event to queue. - - `break_size` defines the break size to print after the image. Refers to - `ppa6.Printer.printBreak()` for value limitations. Set to None or 0 to + + `break_size` defines the break size to print after the image. Refers to + `peripage.Printer.printBreak()` for value limitations. Set to None or 0 to ignore. - - + + Example: ``` printer_task.add_print_break(200) ``` """ - + if break_size is not None and break_size > 0: try: self.events.append(lambda p: p.printBreak(break_size)) @@ -312,57 +312,57 @@ def add_print_break(self, break_size: int=0): except: return False return False - + def add_print_flush_ascii(self): """ Adds simple flush ASCII buffer event to queue. - - + + Example: ``` printer_task.add_print_flush_ascii(200) ``` """ - + try: self.events.append(lambda p: p.flushASCII()) return True except: return False - + def add_print_concentration(self, concentration: int=None): """ Adds concentration change event to queue. - - `concentration` defines the concentration value from range [0, 1, 2]. + + `concentration` defines the concentration value from range [0, 1, 2]. Set to None to ignore. - - + + Example: ``` printer_task.add_print_concentration(1) ``` """ - - if concentration is not None: + + if concentration is not None: try: self.events.append(lambda p: p.setConcentration(concentration)) return True except: return False return False - + def clear_tasks(self): """ Remove all tasks from queue """ - + if self.events: self.events.clear() - + def get_task_count(self): """ Returns rest task count """ - - return len(self.events) if self.events else 0 + + return len(self.events) if self.events else 0 diff --git a/requirements.txt b/requirements.txt index 06ee2ac..b35abff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ Pillow>=8.2.0 -argparse==1.1 -PyBluez>=0.23 -qrcode>=6.1 \ No newline at end of file +argparse>=1.1 +PyBluez @ git+https://github.com/pybluez/pybluez@master#egg=pybluez +qrcode>=6.1 diff --git a/setup.py b/setup.py index ca5775c..495bff2 100644 --- a/setup.py +++ b/setup.py @@ -1,35 +1,38 @@ from setuptools import setup setup( - name = 'ppa6', - packages = ['ppa6'], - version = '0.5', - license='MIT', - description = 'Utility for printing on Peripage A6/A6+ via bluetooth', - author = 'bitrate16', - author_email = 'bitrate16@gmail.com', - url = 'https://github.com/bitrate16/ppa6-python', - download_url = 'https://github.com/bitrate16/ppa6-python/archive/v0.5.tar.gz', - keywords = ['PERIPAGE', 'BLUETOOTH', 'THERMAL PRINTER', 'PRINTER'], - install_requires=[ - 'PyBluez>=0.23', - 'Pillow>=8.2.0', - 'argparse>=1.1', - 'qrcode>=6.1', - ], - classifiers=[ - 'Development Status :: 4 - Beta', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package - 'Intended Audience :: Developers', - 'Topic :: Software Development :: Libraries :: Python Modules', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - ], - entry_points={ - 'console_scripts': [ - 'ppa6 = ppa6.__main__:main' - ] - } -) \ No newline at end of file + name = 'peripage', + packages = ['peripage'], + version = '1.1', + license='MIT', + description = 'Utility for printing on Peripage printers via bluetooth', + author = 'bitrate16', + author_email = 'bitrate16@gmail.com', + url = 'https://github.com/bitrate16/peripage-python', + download_url = 'https://github.com/bitrate16/peripage-python/archive/v1.1.tar.gz', + keywords = ['PERIPAGE', 'BLUETOOTH', 'THERMAL PRINTER', 'PRINTER'], + install_requires=[ + 'PyBluez>=0.23', + 'Pillow>=8.2.0', + 'argparse>=1.1', + 'qrcode>=6.1', + ], + classifiers=[ + 'Development Status :: 5 - Production/Stable', # Chose either "3 - Alpha", "4 - Beta" or "5 - Production/Stable" as the current state of your package + 'Intended Audience :: Developers', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + ], + entry_points={ + 'console_scripts': [ + 'peripage = peripage.__main__:main' + ] + } +)