Reorganize source files into place.* dir

This commit is contained in:
2023-05-12 20:46:15 -05:00
parent 8ae7fee64a
commit 539fad19c0
6 changed files with 14 additions and 12 deletions

95
place/client.py Normal file
View File

@@ -0,0 +1,95 @@
import asyncio
import io
import os
from typing import Optional, List, Union
import websockets
from PIL import Image
from place.constants import Environment
from place.differencing import get_pixel_differences
from place.network import upload_pixels
from place.pixel_types import Pixel
width, height = int(os.getenv(Environment.CANVAS_HEIGHT)), int(os.getenv(Environment.CANVAS_HEIGHT))
total_pixels = width * height
minimum_tolerance = 5 / total_pixels
class PlaceClient:
"""
Defines a stateful client that manages a 'source of truth' with the image created by incremental changes to the Websocket.
"""
def __init__(self, connection) -> None:
self.connection: websockets.WebSocketClientProtocol = connection
# A lock used to manage the 'source of truth' image while performing read intensive operations.
self.source_lock = asyncio.Lock()
# The 'source of truth' image describing what is currently on the canvas.
self.source: Image = Image.new("RGBA", (width, height), (255, 0, 0, 0))
# The current targeted 'output' image.
self.current_target: Optional[Image] = None
def lock(self) -> asyncio.Lock:
return self.source_lock
async def get_differences(self) -> List[Pixel]:
"""
:return: A list of pixels that must be placed on the canvas to meet the currently set task.
"""
if self.current_target is not None:
async with self.lock():
return get_pixel_differences(self.source, self.current_target)
return []
async def complete(self, tolerance: Union[int, float] = 15, sleep: float = 0.25) -> None:
pixel_tolerance = tolerance if type(tolerance) == int else total_pixels * tolerance
if self.current_target is None:
return
pixels = await self.get_differences()
while len(pixels) > pixel_tolerance:
# Upload all the differences
upload_pixels(pixels)
# Wait a bit for the client to catch up. Is this super necessary?
await asyncio.sleep(sleep)
# Recalculate the difference
pixels = await self.get_differences()
@classmethod
async def connect(cls, address: str):
"""A factory method for connecting to the websocket and instantiating the client."""
connection = await websockets.connect(address)
client = cls(connection)
if connection.open:
message = await connection.recv()
if type(message) != bytes:
raise RuntimeError("Fatal: Initial message from websocket was not 'bytes'")
img = Image.open(io.BytesIO(message))
client.source.paste(img)
return client
async def receive(self):
"""
Receiving all server messages and handling them
"""
while True:
try:
message = await self.connection.recv()
if type(message) == bytes:
img = Image.open(io.BytesIO(message))
async with self.lock():
self.source.paste(img, (0, 0), img)
except websockets.ConnectionClosed:
print('Connection with server closed')
break

5
place/constants.py Normal file
View File

@@ -0,0 +1,5 @@
class Environment:
WEBSOCKET_ADDRESS = "WEBSOCKET_ADDRESS"
CANVAS_WIDTH = "CANVAS_WIDTH"
CANVAS_HEIGHT = "CANVAS_HEIGHT"
SOURCE_FILE = "SOURCE_FILE"

34
place/differencing.py Normal file
View File

@@ -0,0 +1,34 @@
import os
from typing import List, Union
from PIL import Image
from place.constants import Environment
from place.pixel_types import Pixel, AlphaPixel
def is_pixel_equal(a: Union[Pixel, AlphaPixel], b: Union[Pixel, AlphaPixel]) -> bool:
return a[0] == b[0] and a[1] == b[1] and a[2] == b[2]
def get_pixel_differences(source: Image, target: Image) -> List[Pixel]:
"""
Returns a list of pixels (location & color) that must be changed to match `source` to `target`.
:param source: The source image (what we currently have).
:param target: The target image (what we want to have).
:return: A list of pixels in tuples.
"""
width, height = int(os.getenv(Environment.CANVAS_HEIGHT)), int(os.getenv(Environment.CANVAS_HEIGHT))
source_pixels = source.load()
target_pixels = target.load()
results = []
for y in range(width):
for x in range(height):
cur_pixel = source_pixels[x, y]
target_pixel = target_pixels[x, y]
if not is_pixel_equal(cur_pixel, target_pixel):
results.append((x, y, target_pixel))
return results

49
place/network.py Normal file
View File

@@ -0,0 +1,49 @@
from typing import Generator, Any, List, Tuple
from multiping import multi_ping
from progressbar import progressbar
from place.pixel_types import Pixel
# The largest possible chunk that can be given to Multiping
maximum_chunk = (2 ** 16) - 1
def get_ip(x: int, y: int, rgb: Tuple[int, int, int], large: bool = False):
"""
Build the destination IP address given the constants. Arguments are not tested for validity.
:param x: The X coordinate as an integer. [0, 512]
:param y: The Y coordinate as an integer. [0, 512]
:param rgb: The RGB of each pixel described by a tuple of integers. [0, 255]
:param large: If true, will place 2x2 pixels instead. Defaults to False.
:return: The IPv6 address as a string.
"""
return f"2a06:a003:d040:{'2' if large else '1'}{x:03X}:{y:03X}:{rgb[0]:02X}:{rgb[1]:02X}:{rgb[2]:02X}"
def chunkify(sequence: List[Any], size: int) -> Generator[List[Any], None, None]:
"""
:param sequence: The sequence of items.
:param size: The size of each individual chunk, at largest.
:return: A generator of lists, each chunk no larger than `size`.
"""
size = max(1, size)
return (sequence[i:i + size] for i in range(0, len(sequence), size))
def upload_pixels(pixels: List[Pixel], chunk_size: int = None):
"""
Given a list of pixels, upload them with the given chunk size.
"""
ips = [get_ip(x, y, rgb) for x, y, rgb in pixels]
return upload(ips, chunk_size)
def upload(ips: List[str], chunk_size: int = None):
# Default to maximum chunk size
if chunk_size is None: chunk_size = maximum_chunk
# random.shuffle(ips)
chunked = list(chunkify(ips, min(maximum_chunk, chunk_size)))
for i, chunk in progressbar(list(enumerate(chunked, start=1))):
multi_ping(chunk, timeout=0.1, retry=0)

6
place/pixel_types.py Normal file
View File

@@ -0,0 +1,6 @@
from typing import Tuple
RGB = Tuple[int, int, int]
Pixel = Tuple[int, int, RGB]
RGBA = Tuple[int, int, int, int]
AlphaPixel = Tuple[int, int, RGBA]