From ced8d3683ab53510d5f65d86d4da8160593be140 Mon Sep 17 00:00:00 2001 From: Gabriel Augendre Date: Fri, 10 Mar 2023 14:17:30 +0100 Subject: [PATCH] Implement a very basic HA client --- .gitignore | 1 - .idea/plant-badge.iml | 1 + .idea/watcherTasks.xml | 8 ++ .pre-commit-config.yaml | 30 +++++ README.md | 40 ++---- requirements-pycharm.txt | 2 +- requirements.txt | 5 +- src/WIFI_CONFIG.py | 5 + src/apps/home_assistant.py | 86 +++++++++++++ src/apps/icon-home-assistant.jpg | Bin 0 -> 1591 bytes src/launcher.py | 184 ++++++++++++++++++++++++++ src/lib/badger2040w.py | 179 ++++++++++++++++++++++++++ src/lib/badger_os.py | 213 +++++++++++++++++++++++++++++++ src/lib/network_manager.py | 117 +++++++++++++++++ src/main.py | 23 +--- tasks.py | 74 +++++++++++ 16 files changed, 913 insertions(+), 55 deletions(-) create mode 100644 .idea/watcherTasks.xml create mode 100644 .pre-commit-config.yaml create mode 100644 src/WIFI_CONFIG.py create mode 100644 src/apps/home_assistant.py create mode 100644 src/apps/icon-home-assistant.jpg create mode 100644 src/launcher.py create mode 100644 src/lib/badger2040w.py create mode 100644 src/lib/badger_os.py create mode 100644 src/lib/network_manager.py create mode 100644 tasks.py diff --git a/.gitignore b/.gitignore index fe6d40c..b2989dc 100644 --- a/.gitignore +++ b/.gitignore @@ -56,7 +56,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ lib64/ parts/ sdist/ diff --git a/.idea/plant-badge.iml b/.idea/plant-badge.iml index d61a996..507a284 100644 --- a/.idea/plant-badge.iml +++ b/.idea/plant-badge.iml @@ -10,6 +10,7 @@ + diff --git a/.idea/watcherTasks.xml b/.idea/watcherTasks.xml new file mode 100644 index 0000000..539a42a --- /dev/null +++ b/.idea/watcherTasks.xml @@ -0,0 +1,8 @@ + + + + + + + \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6a0111b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,30 @@ +exclude: \.min\.(js|css)(\.map)?$|^\.idea/ + +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-ast + - id: check-json + - id: check-toml + - id: check-xml + - id: check-yaml + args: [--allow-multiple-documents] + - id: end-of-file-fixer + - id: check-merge-conflict + - id: debug-statements + - id: detect-private-key + - id: pretty-format-json + args: + - --autofix + - --no-sort-keys + - id: trailing-whitespace + args: + - --markdown-linebreak-ext=md + - id: check-executables-have-shebangs + - id: check-shebang-scripts-are-executable + - repo: https://github.com/psf/black + rev: 23.1.0 + hooks: + - id: black + args: [--target-version, py37] diff --git a/README.md b/README.md index e751037..580f7bf 100644 --- a/README.md +++ b/README.md @@ -12,35 +12,15 @@ pip install -r requirements-pycharm.txt This will install dependencies required by PyCharm to run its MicroPython tools. -## List boards +## Invoke tasks ```shell -mpremote devs -``` - -## Setup / Clean flash - -```shell -# Wipe the board's flash (remove all files) -mpremote connect id:e6614104032e192a \ - exec --no-follow "import os, machine, rp2; os.umount('/'); bdev = rp2.Flash(); os.VfsLfs2.mkfs(bdev, progsize=256); vfs = os.VfsLfs2(bdev, progsize=256); os.mount(vfs, '/'); machine.reset()" -echo "Board clean" -sleep 3 - -# Install dependencies and copy project -cd src/ -mpremote connect id:e6614104032e192a \ - mip install github:miguelgrinberg/microdot/src/microdot.py \ - github:miguelgrinberg/microdot/src/microdot_asyncio.py + \ - cp -r . : + \ - reset -cd .. -``` - -## Update code -```shell -cd src/ -mpremote connect id:e6614104032e192a \ - cp -r . : + \ - reset -cd .. +invoke --list +# Start by getting your board id +inv list +# Then wipe the board +inv wipe +# Then run the initial setup +inv initial-setup +# After that, just update the code when changes are made locally +inv update-code ``` diff --git a/requirements-pycharm.txt b/requirements-pycharm.txt index c9e5f3a..95ead5d 100644 --- a/requirements-pycharm.txt +++ b/requirements-pycharm.txt @@ -1,3 +1,3 @@ +-r requirements.txt docopt>=0.6.2,<0.7 adafruit-ampy>=1.0.5,<1.1 -mpremote diff --git a/requirements.txt b/requirements.txt index a97e503..1c938c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -mpremote \ No newline at end of file +mpremote +invoke>=2.0.0 +black +pre-commit diff --git a/src/WIFI_CONFIG.py b/src/WIFI_CONFIG.py new file mode 100644 index 0000000..99c7b5a --- /dev/null +++ b/src/WIFI_CONFIG.py @@ -0,0 +1,5 @@ +import secrets + +SSID = secrets.SSID +PSK = secrets.PASS +COUNTRY = secrets.COUNTRY diff --git a/src/apps/home_assistant.py b/src/apps/home_assistant.py new file mode 100644 index 0000000..e7f9ece --- /dev/null +++ b/src/apps/home_assistant.py @@ -0,0 +1,86 @@ +import badger2040w as badger2040 +from badger2040w import WIDTH +import urequests +from secrets import HA_BASE_URL, HA_ACCESS_TOKEN + + +display = badger2040.Badger2040W() +display.led(128) +display.set_update_speed(2) + +display.connect() + + +class HAError(Exception): + pass + + +class HAFetchStateError(HAError): + pass + + +class HAPlant: + def __init__(self, entity_id): + self.entity_id = entity_id + self.state = None + self._last_fetched = None + + def fetch_state(self) -> None: + """Fetch state and store in self.state.""" + headers = { + "Authorization": f"Bearer {HA_ACCESS_TOKEN}", + "content-type": "application/json", + } + url = f"{HA_BASE_URL}/states/{self.entity_id}" + print("Fetching state from", url) + res = urequests.get(url, headers=headers) + if res.status_code != 200: + msg = f"Error fetching state for {self.entity_id}: {res.text}" + raise HAFetchStateError(msg) + data = res.json() + self.state = data + res.close() + + def display_state(self): + print(self.state) + + +def draw_page(plant): + print("Drawing page...") + # Clear the display + display.set_pen(15) + display.clear() + display.set_pen(0) + + # Draw the page header + display.set_font("bitmap6") + display.set_pen(0) + display.rectangle(0, 0, WIDTH, 20) + display.set_pen(15) + display.text("Weather", 3, 4) + display.set_pen(0) + + display.set_font("bitmap8") + display.set_pen(0) + display.rectangle(0, 60, WIDTH, 25) + display.set_pen(15) + display.text( + "Found state, check logs", + 5, + 65, + WIDTH, + 1, + ) + + display.update() + plant.display_state() + + +plant = HAPlant("plant.aloe_vera") +plant.fetch_state() +draw_page(plant) + +# Call halt in a loop, on battery this switches off power. +# On USB, the app will exit when A+C is pressed because the launcher picks that up. +while True: + display.halt() diff --git a/src/apps/icon-home-assistant.jpg b/src/apps/icon-home-assistant.jpg new file mode 100644 index 0000000000000000000000000000000000000000..35d3bd0032c2eb93c546df0261b2804873149b99 GIT binary patch literal 1591 zcmb79Ydq5n82Wt2EO>wG%rd^^wketO>LeV^x*zLib@XeURCBLIRR;3F$Q z`VO!Mpb*H9mO)O|Fa;P)P7a2E!{rrF2ows5Kq8ftv1lb_j4~35RzqV{ad06+mK2>b&I ziZFRNLN+6y0f@{N0?Nt4|1AI^viS0F1q>EYRm7=j>*96v%*=P#$;9{{FaH}58I8mI zmj$Gw07~Wv!9X#9El_bP)FYx}f%$@bhPy(rD9T{6<6Q5SFLq$p>z(5uEfMAO#aFr~ zRDaV(E}s?Kan58u{$Q*=H8goXtgv;sBcwdXo=R8o#uPMo{y8}3y|EFzxH)!{wSIjn zbJ*Fle=)|8rTZkBaNgq7FPxPdNnJJ^jSz`b`@%JyS0%^B%q^*c{qu&71xX_wg2v+t zJ0i>{bEg?vUN-!1+T|_2Ie5))0pgcVo)1Fy`ymYouL7zAYlSKUq3-!tnOv*5q9p2M zuHOP8EV$;p|4)ucs(Y`F7?C%nvNWqM`8;{pjC4m(7{rgr?xjhZnO!(b#D-gRvM9|H z>$v7C?@vpeOwqm71LyU9RXo}!;s&YzIbyzQ)UMOg@$0y4_RjBG_boy`H!V0ZB+A}F zGtQQ?M^qc)=H904UH289k_b=rJ!iX{Ol-%!T3zS#<@?mfft+{R*Kz`)sLRcD;mx?0 zik*+1Ribih!>Br0fw~QB|Gig6kB?Z+&@^5Myj@Q=^Pl7qo3382t{YV4gyd>aP}SvV zUXGnyqGp(6*{kNX6sXP*Eq?!Lkv2{^D0CLkg~e{VY07;UwBBIJTIY&eT)M@sse`Lf zUgXYzy3x7bVL$QgvoF+&(Le^9U}Vx18@XXj2WwU6;cDVV3*NsKd+@-88x>}C`W!|= zeSIN)&xws{6~da5;op7Yk{(C_yo-+)cWXv~M^V2Ae469FDAB&))rZdPEP4*^oY%}M zO)48(YA=f&JsXg=CZyQYNlPoYruZ!3k*nz_jeSJ66ks~*#|I@x1@Nd7VaeL>NImiI zs}hnjkY}xheAgHb_a6V9;m^1cW8KE~?pEg4^fLdw;F>>iwiTbN3p3%B`mA4&qzUB< z=}oM-H-pV5A7%%!ka_wUB_G^iwr$0`r9hOXK#6$ZJ%q5o59`H?))%TKBWoPOk716U zQAalxJGfsNlmh;8=r<&lm`52SXR?_E!sE$)aomqqC7EiqKBo=9Ert{J>xI}Qk}^f_ zp4rjRAoaWXqVVpys6OwiLMK6AL0pN_T4DPN zYrP-Krx+0r118a)JEul4RhIuL3W&D#P=H|hg))u`ztI&?kn=S$KD!{yc zd;gtj?cpp_XPx>(o_2Xtb>++ObUi!QW1?xEQ*~M6kVBARCktI@<=^9lHO@$Fk0xw4Xltw5%XVr(mF^(SXFN~DHb$ud)zJDyttd^x=99FUr9ihOEW(&pSh-sC%GG9DK4 sWSkOJ!NJ8!OyPHLr*Ado_MD|shb0D|DK5V4`Z?U);DjbPkt`Mb1%MZyBme*a literal 0 HcmV?d00001 diff --git a/src/launcher.py b/src/launcher.py new file mode 100644 index 0000000..b971307 --- /dev/null +++ b/src/launcher.py @@ -0,0 +1,184 @@ +import gc +import os +import time +import math +import badger2040w as badger2040 +import badger_os +import jpegdec + +APP_DIR = "/apps" +FONT_SIZE = 2 + +changed = False +exited_to_launcher = False +woken_by_button = ( + badger2040.woken_by_button() +) # Must be done before we clear_pressed_to_wake + +if badger2040.pressed_to_wake(badger2040.BUTTON_A) and badger2040.pressed_to_wake( + badger2040.BUTTON_C +): + # Pressing A and C together at start quits app + exited_to_launcher = badger_os.state_clear_running() + badger2040.reset_pressed_to_wake() +else: + # Otherwise restore previously running app + badger_os.state_launch() + + +display = badger2040.Badger2040W() +display.set_font("bitmap8") +display.led(128) + +jpeg = jpegdec.JPEG(display.display) + +state = {"page": 0, "running": "launcher"} + +badger_os.state_load("launcher", state) + +examples = [x[:-3] for x in os.listdir(APP_DIR) if x.endswith(".py")] + +# Approximate center lines for buttons A, B and C +centers = (41, 147, 253) + +MAX_PAGE = math.ceil(len(examples) / 3) + +WIDTH = 296 + + +def map_value(input, in_min, in_max, out_min, out_max): + return (((input - in_min) * (out_max - out_min)) / (in_max - in_min)) + out_min + + +def draw_disk_usage(x): + _, f_used, _ = badger_os.get_disk_usage() + + display.set_pen(15) + display.image( + bytearray( + ( + 0b00000000, + 0b00111100, + 0b00111100, + 0b00111100, + 0b00111000, + 0b00000000, + 0b00000000, + 0b00000001, + ) + ), + 8, + 8, + x, + 4, + ) + display.rectangle(x + 10, 3, 80, 10) + display.set_pen(0) + display.rectangle(x + 11, 4, 78, 8) + display.set_pen(15) + display.rectangle(x + 12, 5, int(76 / 100.0 * f_used), 6) + display.text("{:.2f}%".format(f_used), x + 91, 4, WIDTH, 1.0) + + +def render(): + display.set_pen(15) + display.clear() + display.set_pen(0) + + max_icons = min(3, len(examples[(state["page"] * 3) :])) + + for i in range(max_icons): + x = centers[i] + label = examples[i + (state["page"] * 3)] + icon_label = label.replace("_", "-") + icon = f"{APP_DIR}/icon-{icon_label}.jpg" + label = label.replace("_", " ") + jpeg.open_file(icon) + jpeg.decode(x - 26, 30) + display.set_pen(0) + w = display.measure_text(label, FONT_SIZE) + display.text(label, int(x - (w / 2)), 16 + 80, WIDTH, FONT_SIZE) + + for i in range(MAX_PAGE): + x = 286 + y = int((128 / 2) - (MAX_PAGE * 10 / 2) + (i * 10)) + display.set_pen(0) + display.rectangle(x, y, 8, 8) + if state["page"] != i: + display.set_pen(15) + display.rectangle(x + 1, y + 1, 6, 6) + + display.set_pen(0) + display.rectangle(0, 0, WIDTH, 16) + draw_disk_usage(90) + display.set_pen(15) + display.text("badgerOS", 4, 4, WIDTH, 1.0) + + display.update() + + +def wait_for_user_to_release_buttons(): + while display.pressed_any(): + time.sleep(0.01) + + +def launch_example(index): + wait_for_user_to_release_buttons() + + file = examples[(state["page"] * 3) + index] + file = f"{APP_DIR}/{file}" + + for k in locals().keys(): + if k not in ("gc", "file", "badger_os"): + del locals()[k] + + gc.collect() + + badger_os.launch(file) + + +def button(pin): + global changed + changed = True + + if pin == badger2040.BUTTON_A: + launch_example(0) + if pin == badger2040.BUTTON_B: + launch_example(1) + if pin == badger2040.BUTTON_C: + launch_example(2) + if pin == badger2040.BUTTON_UP: + if state["page"] > 0: + state["page"] -= 1 + render() + if pin == badger2040.BUTTON_DOWN: + if state["page"] < MAX_PAGE - 1: + state["page"] += 1 + render() + + +if exited_to_launcher or not woken_by_button: + wait_for_user_to_release_buttons() + display.set_update_speed(badger2040.UPDATE_MEDIUM) + render() + +display.set_update_speed(badger2040.UPDATE_FAST) + +while True: + if display.pressed(badger2040.BUTTON_A): + button(badger2040.BUTTON_A) + if display.pressed(badger2040.BUTTON_B): + button(badger2040.BUTTON_B) + if display.pressed(badger2040.BUTTON_C): + button(badger2040.BUTTON_C) + + if display.pressed(badger2040.BUTTON_UP): + button(badger2040.BUTTON_UP) + if display.pressed(badger2040.BUTTON_DOWN): + button(badger2040.BUTTON_DOWN) + + if changed: + badger_os.state_save("launcher", state) + changed = False + + display.halt() diff --git a/src/lib/badger2040w.py b/src/lib/badger2040w.py new file mode 100644 index 0000000..c7051d1 --- /dev/null +++ b/src/lib/badger2040w.py @@ -0,0 +1,179 @@ +import machine +import micropython +from picographics import PicoGraphics, DISPLAY_INKY_PACK +import network +from network_manager import NetworkManager +import WIFI_CONFIG +import uasyncio +import time +import gc +import wakeup + + +BUTTON_DOWN = 11 +BUTTON_A = 12 +BUTTON_B = 13 +BUTTON_C = 14 +BUTTON_UP = 15 +BUTTON_USER = None # User button not available on W + +BUTTON_MASK = 0b11111 << 11 + +SYSTEM_VERY_SLOW = 0 +SYSTEM_SLOW = 1 +SYSTEM_NORMAL = 2 +SYSTEM_FAST = 3 +SYSTEM_TURBO = 4 + +UPDATE_NORMAL = 0 +UPDATE_MEDIUM = 1 +UPDATE_FAST = 2 +UPDATE_TURBO = 3 + +LED = 22 +ENABLE_3V3 = 10 +BUSY = 26 + +WIDTH = 296 +HEIGHT = 128 + +SYSTEM_FREQS = [4000000, 12000000, 48000000, 133000000, 250000000] + +BUTTONS = { + BUTTON_DOWN: machine.Pin(BUTTON_DOWN, machine.Pin.IN, machine.Pin.PULL_DOWN), + BUTTON_A: machine.Pin(BUTTON_A, machine.Pin.IN, machine.Pin.PULL_DOWN), + BUTTON_B: machine.Pin(BUTTON_B, machine.Pin.IN, machine.Pin.PULL_DOWN), + BUTTON_C: machine.Pin(BUTTON_C, machine.Pin.IN, machine.Pin.PULL_DOWN), + BUTTON_UP: machine.Pin(BUTTON_UP, machine.Pin.IN, machine.Pin.PULL_DOWN), +} + +WAKEUP_MASK = 0 + + +def woken_by_button(): + return wakeup.get_gpio_state() & BUTTON_MASK > 0 + + +def pressed_to_wake(button): + return wakeup.get_gpio_state() & (1 << button) > 0 + + +def reset_pressed_to_wake(): + wakeup.reset_gpio_state() + + +def pressed_to_wake_get_once(button): + global WAKEUP_MASK + result = (wakeup.get_gpio_state() & ~WAKEUP_MASK & (1 << button)) > 0 + WAKEUP_MASK |= 1 << button + return result + + +def system_speed(speed): + try: + machine.freq(SYSTEM_FREQS[speed]) + except IndexError: + pass + + +class Badger2040W: + def __init__(self): + self.display = PicoGraphics(DISPLAY_INKY_PACK) + self._led = machine.PWM(machine.Pin(LED)) + self._led.freq(1000) + self._led.duty_u16(0) + self._update_speed = 0 + + def __getattr__(self, item): + # Glue to redirect calls to PicoGraphics + return getattr(self.display, item) + + def update(self): + t_start = time.ticks_ms() + self.display.update() + t_elapsed = time.ticks_ms() - t_start + + delay_ms = [4700, 2600, 900, 250][self._update_speed] + + if t_elapsed < delay_ms: + time.sleep((delay_ms - t_elapsed) / 1000) + + def set_update_speed(self, speed): + self.display.set_update_speed(speed) + self._update_speed = speed + + def led(self, brightness): + brightness = max(0, min(255, brightness)) + self._led.duty_u16(int(brightness * 256)) + + def invert(self, invert): + raise RuntimeError("Display invert not supported in PicoGraphics.") + + def thickness(self, thickness): + raise RuntimeError("Thickness not supported in PicoGraphics.") + + def halt(self): + time.sleep(0.05) + enable = machine.Pin(ENABLE_3V3, machine.Pin.OUT) + enable.off() + while not self.pressed_any(): + pass + + def pressed(self, button): + return BUTTONS[button].value() == 1 or pressed_to_wake_get_once(button) + + def pressed_any(self): + for button in BUTTONS.values(): + if button.value(): + return True + return False + + @micropython.native + def icon(self, data, index, data_w, icon_size, x, y): + s_x = (index * icon_size) % data_w + s_y = int((index * icon_size) / data_w) + + for o_y in range(icon_size): + for o_x in range(icon_size): + o = ((o_y + s_y) * data_w) + (o_x + s_x) + bm = 0b10000000 >> (o & 0b111) + if data[o >> 3] & bm: + self.display.pixel(x + o_x, y + o_y) + + def image(self, data, w, h, x, y): + for oy in range(h): + row = data[oy] + for ox in range(w): + if row & 0b1 == 0: + self.display.pixel(x + ox, y + oy) + row >>= 1 + + def status_handler(self, mode, status, ip): + print(mode, status, ip) + self.display.set_pen(15) + self.display.clear() + self.display.set_pen(0) + if status: + self.display.text("Connected!", 10, 10, 300, 0.5) + self.display.text(ip, 10, 30, 300, 0.5) + else: + self.display.text("Connecting...", 10, 10, 300, 0.5) + self.update() + + def isconnected(self): + return network.WLAN(network.STA_IF).isconnected() + + def ip_address(self): + return network.WLAN(network.STA_IF).ifconfig()[0] + + def connect(self): + if WIFI_CONFIG.COUNTRY == "": + raise RuntimeError("You must populate WIFI_CONFIG.py for networking.") + self.display.set_update_speed(2) + network_manager = NetworkManager( + WIFI_CONFIG.COUNTRY, status_handler=self.status_handler + ) + uasyncio.get_event_loop().run_until_complete( + network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK) + ) + gc.collect() diff --git a/src/lib/badger_os.py b/src/lib/badger_os.py new file mode 100644 index 0000000..c818472 --- /dev/null +++ b/src/lib/badger_os.py @@ -0,0 +1,213 @@ +"""Keep track of app state in persistent flash storage.""" + +import os +import gc +import time +import json +import machine +import badger2040w as badger2040 + + +def get_battery_level(): + return 0 + # Battery measurement + vbat_adc = machine.ADC(badger2040.PIN_BATTERY) + vref_adc = machine.ADC(badger2040.PIN_1V2_REF) + vref_en = machine.Pin(badger2040.PIN_VREF_POWER) + vref_en.init(machine.Pin.OUT) + vref_en.value(0) + + # Enable the onboard voltage reference + vref_en.value(1) + + # Calculate the logic supply voltage, as will be lower that the usual 3.3V when running off low batteries + vdd = 1.24 * (65535 / vref_adc.read_u16()) + vbat = ( + (vbat_adc.read_u16() / 65535) * 3 * vdd + ) # 3 in this is a gain, not rounding of 3.3V + + # Disable the onboard voltage reference + vref_en.value(0) + + # Convert the voltage to a level to display onscreen + return vbat + + +def get_disk_usage(): + # f_bfree and f_bavail should be the same? + # f_files, f_ffree, f_favail and f_flag are unsupported. + f_bsize, f_frsize, f_blocks, f_bfree, _, _, _, _, _, f_namemax = os.statvfs("/") + + f_total_size = f_frsize * f_blocks + f_total_free = f_bsize * f_bfree + f_total_used = f_total_size - f_total_free + + f_used = 100 / f_total_size * f_total_used + f_free = 100 / f_total_size * f_total_free + + return f_total_size, f_used, f_free + + +def state_running(): + state = {"running": "launcher"} + state_load("launcher", state) + return state["running"] + + +def state_clear_running(): + running = state_running() + state_modify("launcher", {"running": "launcher"}) + return running != "launcher" + + +def state_set_running(app): + state_modify("launcher", {"running": app}) + + +def state_launch(): + app = state_running() + if app is not None and app != "launcher": + launch(app) + + +def state_delete(app): + try: + os.remove("/state/{}.json".format(app)) + except OSError: + pass + + +def state_save(app, data): + try: + with open("/state/{}.json".format(app), "w") as f: + f.write(json.dumps(data)) + f.flush() + except OSError: + import os + + try: + os.stat("/state") + except OSError: + os.mkdir("/state") + state_save(app, data) + + +def state_modify(app, data): + state = {} + state_load(app, state) + state.update(data) + state_save(app, state) + + +def state_load(app, defaults): + try: + data = json.loads(open("/state/{}.json".format(app), "r").read()) + if type(data) is dict: + defaults.update(data) + return True + except (OSError, ValueError): + pass + + state_save(app, defaults) + return False + + +def launch(file): + state_set_running(file) + + gc.collect() + + button_a = machine.Pin(badger2040.BUTTON_A, machine.Pin.IN, machine.Pin.PULL_DOWN) + button_c = machine.Pin(badger2040.BUTTON_C, machine.Pin.IN, machine.Pin.PULL_DOWN) + + def quit_to_launcher(pin): + if button_a.value() and button_c.value(): + machine.reset() + + button_a.irq(trigger=machine.Pin.IRQ_RISING, handler=quit_to_launcher) + button_c.irq(trigger=machine.Pin.IRQ_RISING, handler=quit_to_launcher) + + try: + __import__(file) + + except ImportError as e: + # If the app doesn't exist, notify the user + warning(None, f"Could not launch: {file}") + print(e) + time.sleep(4.0) + except Exception as e: + # If the app throws an error, catch it and display! + print(e) + warning(None, str(e)) + time.sleep(4.0) + + # If the app exits or errors, do not relaunch! + state_clear_running() + machine.reset() # Exit back to launcher + + +# Draw an overlay box with a given message within it +def warning( + display, + message, + width=badger2040.WIDTH - 20, + height=badger2040.HEIGHT - 20, + line_spacing=20, + text_size=0.6, +): + print(message) + + if display is None: + display = badger2040.Badger2040W() + display.led(128) + + # Draw a light grey background + display.set_pen(12) + display.rectangle( + (badger2040.WIDTH - width) // 2, + (badger2040.HEIGHT - height) // 2, + width, + height, + ) + + width -= 20 + height -= 20 + + display.set_pen(15) + display.rectangle( + (badger2040.WIDTH - width) // 2, + (badger2040.HEIGHT - height) // 2, + width, + height, + ) + + # Take the provided message and split it up into + # lines that fit within the specified width + words = message.split(" ") + + lines = [] + current_line = "" + for word in words: + if display.measure_text(current_line + word + " ", text_size) < width: + current_line += word + " " + else: + lines.append(current_line.strip()) + current_line = word + " " + lines.append(current_line.strip()) + + display.set_pen(0) + + # Display each line of text from the message, centre-aligned + num_lines = len(lines) + for i in range(num_lines): + length = display.measure_text(lines[i], text_size) + current_line = (i * line_spacing) - ((num_lines - 1) * line_spacing) // 2 + display.text( + lines[i], + (badger2040.WIDTH - length) // 2, + (badger2040.HEIGHT // 2) + current_line, + badger2040.WIDTH, + text_size, + ) + + display.update() diff --git a/src/lib/network_manager.py b/src/lib/network_manager.py new file mode 100644 index 0000000..8feffaf --- /dev/null +++ b/src/lib/network_manager.py @@ -0,0 +1,117 @@ +import rp2 +import network +import machine +import uasyncio + + +class NetworkManager: + _ifname = ("Client", "Access Point") + + def __init__( + self, + country="GB", + client_timeout=30, + access_point_timeout=5, + status_handler=None, + error_handler=None, + ): + rp2.country(country) + self._ap_if = network.WLAN(network.AP_IF) + self._sta_if = network.WLAN(network.STA_IF) + + self._mode = network.STA_IF + self._client_timeout = client_timeout + self._access_point_timeout = access_point_timeout + self._status_handler = status_handler + self._error_handler = error_handler + self.UID = ("{:02X}" * 8).format(*machine.unique_id()) + + def isconnected(self): + return self._sta_if.isconnected() or self._ap_if.isconnected() + + def config(self, var): + if self._sta_if.active(): + return self._sta_if.config(var) + else: + if var == "password": + return self.UID + return self._ap_if.config(var) + + def mode(self): + if self._sta_if.isconnected(): + return self._ifname[0] + if self._ap_if.isconnected(): + return self._ifname[1] + return None + + def ifaddress(self): + if self._sta_if.isconnected(): + return self._sta_if.ifconfig()[0] + if self._ap_if.isconnected(): + return self._ap_if.ifconfig()[0] + return "0.0.0.0" + + def disconnect(self): + if self._sta_if.isconnected(): + self._sta_if.disconnect() + if self._ap_if.isconnected(): + self._ap_if.disconnect() + + async def wait(self, mode): + while not self.isconnected(): + self._handle_status(mode, None) + await uasyncio.sleep_ms(1000) + + def _handle_status(self, mode, status): + if callable(self._status_handler): + self._status_handler(self._ifname[mode], status, self.ifaddress()) + + def _handle_error(self, mode, msg): + if callable(self._error_handler): + if self._error_handler(self._ifname[mode], msg): + return + raise RuntimeError(msg) + + async def client(self, ssid, psk): + if self._sta_if.isconnected(): + self._handle_status(network.STA_IF, True) + return + + self._ap_if.disconnect() + self._ap_if.active(False) + + self._sta_if.active(True) + self._sta_if.connect(ssid, psk) + self._sta_if.config(pm=0xA11140) + + try: + await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout) + self._handle_status(network.STA_IF, True) + + except uasyncio.TimeoutError: + self._sta_if.active(False) + self._handle_status(network.STA_IF, False) + self._handle_error(network.STA_IF, "WIFI Client Failed") + + async def access_point(self): + if self._ap_if.isconnected(): + self._handle_status(network.AP_IF, True) + return + + self._sta_if.disconnect() + self._sta_if.active(False) + + self._ap_if.ifconfig(("10.10.1.1", "255.255.255.0", "10.10.1.1", "10.10.1.1")) + self._ap_if.config(password=self.UID) + self._ap_if.active(True) + + try: + await uasyncio.wait_for( + self.wait(network.AP_IF), self._access_point_timeout + ) + self._handle_status(network.AP_IF, True) + + except uasyncio.TimeoutError: + self._sta_if.active(False) + self._handle_status(network.AP_IF, False) + self._handle_error(network.AP_IF, "WIFI Client Failed") diff --git a/src/main.py b/src/main.py index 653a92f..0fa5758 100644 --- a/src/main.py +++ b/src/main.py @@ -1,22 +1 @@ -from picographics import PicoGraphics, DISPLAY_INKY_PACK - -import wifi - - -PEN_BLACK = 0 -PEN_WHITE = 15 - -display = PicoGraphics(DISPLAY_INKY_PACK) -display.set_pen(PEN_WHITE) -display.clear() -display.set_pen(PEN_BLACK) -display.text("Connecting...", 10, 10) -display.update() - -wifi.setup() -display.set_pen(PEN_WHITE) -display.clear() -display.set_pen(PEN_BLACK) -display.text("Connected!", 10, 10) -display.update() - +import launcher # noqa: F401 diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..1c65bb8 --- /dev/null +++ b/tasks.py @@ -0,0 +1,74 @@ +import time +from pathlib import Path + +from invoke import task, Context + +BASE_DIR = Path(__file__).parent.resolve(strict=True) +SRC_DIR = BASE_DIR / "src" + +MICROPYTHON_DEPENDENCIES = [ + # "github:miguelgrinberg/microdot/src/microdot.py", + # "github:miguelgrinberg/microdot/src/microdot_asyncio.py", +] + + +@task +def wipe(c: Context, board_id: str): + """Wipe the board with mpremote.""" + c.run( + f'mpremote connect id:{board_id} exec --no-follow "' + "import os, machine, rp2;" + "os.umount('/');" + "bdev = rp2.Flash();" + "os.VfsLfs2.mkfs(bdev, progsize=256);" + "vfs = os.VfsLfs2(bdev, progsize=256);" + "os.mount(vfs, '/');" + 'machine.reset()"', + pty=True, + echo=True, + ) + print("Board wiped, waiting for it to reboot...") + time.sleep(3) + print("Done!") + + +@task +def list(c: Context): + """List connected boards with mpremote.""" + c.run("mpremote devs", pty=True, echo=True) + + +@task +def initial_setup(c: Context, board_id: str): + """Install dependencies and copy project files to the board.""" + with c.cd(SRC_DIR): + if MICROPYTHON_DEPENDENCIES: + deps = " ".join(MICROPYTHON_DEPENDENCIES) + c.run( + f"mpremote connect id:{board_id} " + f"mip install {deps} + " + "cp -r . : + " + "reset", + pty=True, + echo=True, + ) + else: + c.run( + f"mpremote connect id:{board_id} " "cp -r . : + " "reset", + pty=True, + echo=True, + ) + + +@task +def update_code(c: Context, board_id: str): + """Update code on the board.""" + # mpremote connect id:e6614864d3269c34 \ + # cp -r . : + \ + # reset + with c.cd(SRC_DIR): + c.run( + f"mpremote connect id:{board_id} " "cp -r . : + " "reset", + pty=True, + echo=True, + )