talking to the system in scratch

henlo again

not much typings today, maybe later.

we run pyhon code ere but u could do anytin

image

import asyncio
from websockets.server import serve
import json
import base64

SERVICE_ID = 61445
CHARACTERISTIC_ID = "5261da01-fa7e-42ab-850b-7c80220097cc"

class Client():
    def __init__(self, ws):
        self.ws = ws
        self.val = "AAAA"
        self.vals = bytearray()
        self.code = ""
   
    def set_val(self, val):
        b = bytearray(3)
        b[2] = val
        self.val = str(base64.b64encode(b), "utf8")

    async def result(self, result):
        await self.ws.send(json.dumps({"jsonrpc": "2.0", "id": self.data["id"], "result": result}))

    async def ok(self):
        await self.result(None)

    async def params(self, method, **params):
        await self.ws.send(json.dumps({"jsonrpc": "2.0", "method": method, "params": params}))

    async def discover(self):
        await self.params("didDiscoverPeripheral", peripheralId=0, name="sys", rssi=None)
        await self.ok()

    async def update_val(self):
        await self.params("characteristicDidChange", serviceId=SERVICE_ID, characteristicId=CHARACTERISTIC_ID, message=self.val, encoding="base64")

    async def notifications(self):
        while True:
            await self.update_val()
            await asyncio.sleep(1)

    async def read(self):
        await self.result({"message": self.val, "encoding": "base64"})
        if self.data["params"].get("startNotifications"):
            asyncio.create_task(self.notifications())

    def print(self, *v, **kv):
        for e in v:
            for c in str(e):
                self.vals.append(ord(c))
            self.vals.append(ord(" "))

    async def write(self):
        message = self.data["params"]["message"]
        if message == "ggAAAAAA":
            if self.code:
                self.vals = bytearray()
                print = self.print
                exec(self.code)
                self.code = ""
            else:
                if self.vals: self.vals = self.vals[1:]
        else:
            d = base64.b64decode(message)
            self.code = self.code + str(d[1:], "utf8").replace(";", "\n")
        await self.result(0)
        self.set_val(self.vals[0] if self.vals and len(self.vals) > 0 else 0)
        await self.update_val()

    async def run(self):
        async for msg in self.ws:
            self.data = json.loads(msg)
            print(self.data)
            match self.data.get("method"):
                case "discover": await self.discover()
                case "read": await self.read()
                case "write": await self.write()
                case _: await self.ok()

class Server():
    async def connection(self, ws):
        await Client(ws).run()

    async def serve(self):
        async with serve(self.connection, "127.0.0.1", 20111):
            await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(Server().serve())