mirror of
https://github.com/Crocmagnon/plant-badger.git
synced 2024-11-23 09:58:02 +01:00
Implement a very basic HA client
This commit is contained in:
parent
2b24d473b2
commit
ced8d3683a
16 changed files with 913 additions and 55 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -56,7 +56,6 @@ dist/
|
||||||
downloads/
|
downloads/
|
||||||
eggs/
|
eggs/
|
||||||
.eggs/
|
.eggs/
|
||||||
lib/
|
|
||||||
lib64/
|
lib64/
|
||||||
parts/
|
parts/
|
||||||
sdist/
|
sdist/
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
<component name="NewModuleRootManager">
|
<component name="NewModuleRootManager">
|
||||||
<content url="file://$MODULE_DIR$">
|
<content url="file://$MODULE_DIR$">
|
||||||
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
|
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/.direnv" />
|
||||||
</content>
|
</content>
|
||||||
<orderEntry type="jdk" jdkName="Python 3.10 (plant-badge)" jdkType="Python SDK" />
|
<orderEntry type="jdk" jdkName="Python 3.10 (plant-badge)" jdkType="Python SDK" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
|
|
8
.idea/watcherTasks.xml
Normal file
8
.idea/watcherTasks.xml
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectTasksOptions">
|
||||||
|
<enabled-global>
|
||||||
|
<option value="pre-commit" />
|
||||||
|
</enabled-global>
|
||||||
|
</component>
|
||||||
|
</project>
|
30
.pre-commit-config.yaml
Normal file
30
.pre-commit-config.yaml
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
exclude: \.min\.(js|css)(\.map)?$|^\.idea/
|
||||||
|
|
||||||
|
repos:
|
||||||
|
- repo: https://github.com/pre-commit/pre-commit-hooks
|
||||||
|
rev: v4.4.0
|
||||||
|
hooks:
|
||||||
|
- id: check-ast
|
||||||
|
- id: check-json
|
||||||
|
- id: check-toml
|
||||||
|
- id: check-xml
|
||||||
|
- id: check-yaml
|
||||||
|
args: [--allow-multiple-documents]
|
||||||
|
- id: end-of-file-fixer
|
||||||
|
- id: check-merge-conflict
|
||||||
|
- id: debug-statements
|
||||||
|
- id: detect-private-key
|
||||||
|
- id: pretty-format-json
|
||||||
|
args:
|
||||||
|
- --autofix
|
||||||
|
- --no-sort-keys
|
||||||
|
- id: trailing-whitespace
|
||||||
|
args:
|
||||||
|
- --markdown-linebreak-ext=md
|
||||||
|
- id: check-executables-have-shebangs
|
||||||
|
- id: check-shebang-scripts-are-executable
|
||||||
|
- repo: https://github.com/psf/black
|
||||||
|
rev: 23.1.0
|
||||||
|
hooks:
|
||||||
|
- id: black
|
||||||
|
args: [--target-version, py37]
|
40
README.md
40
README.md
|
@ -12,35 +12,15 @@ pip install -r requirements-pycharm.txt
|
||||||
|
|
||||||
This will install dependencies required by PyCharm to run its MicroPython tools.
|
This will install dependencies required by PyCharm to run its MicroPython tools.
|
||||||
|
|
||||||
## List boards
|
## Invoke tasks
|
||||||
```shell
|
```shell
|
||||||
mpremote devs
|
invoke --list
|
||||||
```
|
# Start by getting your board id
|
||||||
|
inv list
|
||||||
## Setup / Clean flash
|
# Then wipe the board
|
||||||
|
inv wipe <board_id>
|
||||||
```shell
|
# Then run the initial setup
|
||||||
# Wipe the board's flash (remove all files)
|
inv initial-setup <board_id>
|
||||||
mpremote connect id:e6614104032e192a \
|
# After that, just update the code when changes are made locally
|
||||||
exec --no-follow "import os, machine, rp2; os.umount('/'); bdev = rp2.Flash(); os.VfsLfs2.mkfs(bdev, progsize=256); vfs = os.VfsLfs2(bdev, progsize=256); os.mount(vfs, '/'); machine.reset()"
|
inv update-code <board_id>
|
||||||
echo "Board clean"
|
|
||||||
sleep 3
|
|
||||||
|
|
||||||
# Install dependencies and copy project
|
|
||||||
cd src/
|
|
||||||
mpremote connect id:e6614104032e192a \
|
|
||||||
mip install github:miguelgrinberg/microdot/src/microdot.py \
|
|
||||||
github:miguelgrinberg/microdot/src/microdot_asyncio.py + \
|
|
||||||
cp -r . : + \
|
|
||||||
reset
|
|
||||||
cd ..
|
|
||||||
```
|
|
||||||
|
|
||||||
## Update code
|
|
||||||
```shell
|
|
||||||
cd src/
|
|
||||||
mpremote connect id:e6614104032e192a \
|
|
||||||
cp -r . : + \
|
|
||||||
reset
|
|
||||||
cd ..
|
|
||||||
```
|
```
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
|
-r requirements.txt
|
||||||
docopt>=0.6.2,<0.7
|
docopt>=0.6.2,<0.7
|
||||||
adafruit-ampy>=1.0.5,<1.1
|
adafruit-ampy>=1.0.5,<1.1
|
||||||
mpremote
|
|
||||||
|
|
|
@ -1 +1,4 @@
|
||||||
mpremote
|
mpremote
|
||||||
|
invoke>=2.0.0
|
||||||
|
black
|
||||||
|
pre-commit
|
||||||
|
|
5
src/WIFI_CONFIG.py
Normal file
5
src/WIFI_CONFIG.py
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
import secrets
|
||||||
|
|
||||||
|
SSID = secrets.SSID
|
||||||
|
PSK = secrets.PASS
|
||||||
|
COUNTRY = secrets.COUNTRY
|
86
src/apps/home_assistant.py
Normal file
86
src/apps/home_assistant.py
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
import badger2040w as badger2040
|
||||||
|
from badger2040w import WIDTH
|
||||||
|
import urequests
|
||||||
|
from secrets import HA_BASE_URL, HA_ACCESS_TOKEN
|
||||||
|
|
||||||
|
|
||||||
|
display = badger2040.Badger2040W()
|
||||||
|
display.led(128)
|
||||||
|
display.set_update_speed(2)
|
||||||
|
|
||||||
|
display.connect()
|
||||||
|
|
||||||
|
|
||||||
|
class HAError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HAFetchStateError(HAError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HAPlant:
|
||||||
|
def __init__(self, entity_id):
|
||||||
|
self.entity_id = entity_id
|
||||||
|
self.state = None
|
||||||
|
self._last_fetched = None
|
||||||
|
|
||||||
|
def fetch_state(self) -> None:
|
||||||
|
"""Fetch state and store in self.state."""
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Bearer {HA_ACCESS_TOKEN}",
|
||||||
|
"content-type": "application/json",
|
||||||
|
}
|
||||||
|
url = f"{HA_BASE_URL}/states/{self.entity_id}"
|
||||||
|
print("Fetching state from", url)
|
||||||
|
res = urequests.get(url, headers=headers)
|
||||||
|
if res.status_code != 200:
|
||||||
|
msg = f"Error fetching state for {self.entity_id}: {res.text}"
|
||||||
|
raise HAFetchStateError(msg)
|
||||||
|
data = res.json()
|
||||||
|
self.state = data
|
||||||
|
res.close()
|
||||||
|
|
||||||
|
def display_state(self):
|
||||||
|
print(self.state)
|
||||||
|
|
||||||
|
|
||||||
|
def draw_page(plant):
|
||||||
|
print("Drawing page...")
|
||||||
|
# Clear the display
|
||||||
|
display.set_pen(15)
|
||||||
|
display.clear()
|
||||||
|
display.set_pen(0)
|
||||||
|
|
||||||
|
# Draw the page header
|
||||||
|
display.set_font("bitmap6")
|
||||||
|
display.set_pen(0)
|
||||||
|
display.rectangle(0, 0, WIDTH, 20)
|
||||||
|
display.set_pen(15)
|
||||||
|
display.text("Weather", 3, 4)
|
||||||
|
display.set_pen(0)
|
||||||
|
|
||||||
|
display.set_font("bitmap8")
|
||||||
|
display.set_pen(0)
|
||||||
|
display.rectangle(0, 60, WIDTH, 25)
|
||||||
|
display.set_pen(15)
|
||||||
|
display.text(
|
||||||
|
"Found state, check logs",
|
||||||
|
5,
|
||||||
|
65,
|
||||||
|
WIDTH,
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
display.update()
|
||||||
|
plant.display_state()
|
||||||
|
|
||||||
|
|
||||||
|
plant = HAPlant("plant.aloe_vera")
|
||||||
|
plant.fetch_state()
|
||||||
|
draw_page(plant)
|
||||||
|
|
||||||
|
# Call halt in a loop, on battery this switches off power.
|
||||||
|
# On USB, the app will exit when A+C is pressed because the launcher picks that up.
|
||||||
|
while True:
|
||||||
|
display.halt()
|
BIN
src/apps/icon-home-assistant.jpg
Normal file
BIN
src/apps/icon-home-assistant.jpg
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.6 KiB |
184
src/launcher.py
Normal file
184
src/launcher.py
Normal file
|
@ -0,0 +1,184 @@
|
||||||
|
import gc
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import math
|
||||||
|
import badger2040w as badger2040
|
||||||
|
import badger_os
|
||||||
|
import jpegdec
|
||||||
|
|
||||||
|
APP_DIR = "/apps"
|
||||||
|
FONT_SIZE = 2
|
||||||
|
|
||||||
|
changed = False
|
||||||
|
exited_to_launcher = False
|
||||||
|
woken_by_button = (
|
||||||
|
badger2040.woken_by_button()
|
||||||
|
) # Must be done before we clear_pressed_to_wake
|
||||||
|
|
||||||
|
if badger2040.pressed_to_wake(badger2040.BUTTON_A) and badger2040.pressed_to_wake(
|
||||||
|
badger2040.BUTTON_C
|
||||||
|
):
|
||||||
|
# Pressing A and C together at start quits app
|
||||||
|
exited_to_launcher = badger_os.state_clear_running()
|
||||||
|
badger2040.reset_pressed_to_wake()
|
||||||
|
else:
|
||||||
|
# Otherwise restore previously running app
|
||||||
|
badger_os.state_launch()
|
||||||
|
|
||||||
|
|
||||||
|
display = badger2040.Badger2040W()
|
||||||
|
display.set_font("bitmap8")
|
||||||
|
display.led(128)
|
||||||
|
|
||||||
|
jpeg = jpegdec.JPEG(display.display)
|
||||||
|
|
||||||
|
state = {"page": 0, "running": "launcher"}
|
||||||
|
|
||||||
|
badger_os.state_load("launcher", state)
|
||||||
|
|
||||||
|
examples = [x[:-3] for x in os.listdir(APP_DIR) if x.endswith(".py")]
|
||||||
|
|
||||||
|
# Approximate center lines for buttons A, B and C
|
||||||
|
centers = (41, 147, 253)
|
||||||
|
|
||||||
|
MAX_PAGE = math.ceil(len(examples) / 3)
|
||||||
|
|
||||||
|
WIDTH = 296
|
||||||
|
|
||||||
|
|
||||||
|
def map_value(input, in_min, in_max, out_min, out_max):
|
||||||
|
return (((input - in_min) * (out_max - out_min)) / (in_max - in_min)) + out_min
|
||||||
|
|
||||||
|
|
||||||
|
def draw_disk_usage(x):
|
||||||
|
_, f_used, _ = badger_os.get_disk_usage()
|
||||||
|
|
||||||
|
display.set_pen(15)
|
||||||
|
display.image(
|
||||||
|
bytearray(
|
||||||
|
(
|
||||||
|
0b00000000,
|
||||||
|
0b00111100,
|
||||||
|
0b00111100,
|
||||||
|
0b00111100,
|
||||||
|
0b00111000,
|
||||||
|
0b00000000,
|
||||||
|
0b00000000,
|
||||||
|
0b00000001,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
8,
|
||||||
|
8,
|
||||||
|
x,
|
||||||
|
4,
|
||||||
|
)
|
||||||
|
display.rectangle(x + 10, 3, 80, 10)
|
||||||
|
display.set_pen(0)
|
||||||
|
display.rectangle(x + 11, 4, 78, 8)
|
||||||
|
display.set_pen(15)
|
||||||
|
display.rectangle(x + 12, 5, int(76 / 100.0 * f_used), 6)
|
||||||
|
display.text("{:.2f}%".format(f_used), x + 91, 4, WIDTH, 1.0)
|
||||||
|
|
||||||
|
|
||||||
|
def render():
|
||||||
|
display.set_pen(15)
|
||||||
|
display.clear()
|
||||||
|
display.set_pen(0)
|
||||||
|
|
||||||
|
max_icons = min(3, len(examples[(state["page"] * 3) :]))
|
||||||
|
|
||||||
|
for i in range(max_icons):
|
||||||
|
x = centers[i]
|
||||||
|
label = examples[i + (state["page"] * 3)]
|
||||||
|
icon_label = label.replace("_", "-")
|
||||||
|
icon = f"{APP_DIR}/icon-{icon_label}.jpg"
|
||||||
|
label = label.replace("_", " ")
|
||||||
|
jpeg.open_file(icon)
|
||||||
|
jpeg.decode(x - 26, 30)
|
||||||
|
display.set_pen(0)
|
||||||
|
w = display.measure_text(label, FONT_SIZE)
|
||||||
|
display.text(label, int(x - (w / 2)), 16 + 80, WIDTH, FONT_SIZE)
|
||||||
|
|
||||||
|
for i in range(MAX_PAGE):
|
||||||
|
x = 286
|
||||||
|
y = int((128 / 2) - (MAX_PAGE * 10 / 2) + (i * 10))
|
||||||
|
display.set_pen(0)
|
||||||
|
display.rectangle(x, y, 8, 8)
|
||||||
|
if state["page"] != i:
|
||||||
|
display.set_pen(15)
|
||||||
|
display.rectangle(x + 1, y + 1, 6, 6)
|
||||||
|
|
||||||
|
display.set_pen(0)
|
||||||
|
display.rectangle(0, 0, WIDTH, 16)
|
||||||
|
draw_disk_usage(90)
|
||||||
|
display.set_pen(15)
|
||||||
|
display.text("badgerOS", 4, 4, WIDTH, 1.0)
|
||||||
|
|
||||||
|
display.update()
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_user_to_release_buttons():
|
||||||
|
while display.pressed_any():
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
def launch_example(index):
|
||||||
|
wait_for_user_to_release_buttons()
|
||||||
|
|
||||||
|
file = examples[(state["page"] * 3) + index]
|
||||||
|
file = f"{APP_DIR}/{file}"
|
||||||
|
|
||||||
|
for k in locals().keys():
|
||||||
|
if k not in ("gc", "file", "badger_os"):
|
||||||
|
del locals()[k]
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
badger_os.launch(file)
|
||||||
|
|
||||||
|
|
||||||
|
def button(pin):
|
||||||
|
global changed
|
||||||
|
changed = True
|
||||||
|
|
||||||
|
if pin == badger2040.BUTTON_A:
|
||||||
|
launch_example(0)
|
||||||
|
if pin == badger2040.BUTTON_B:
|
||||||
|
launch_example(1)
|
||||||
|
if pin == badger2040.BUTTON_C:
|
||||||
|
launch_example(2)
|
||||||
|
if pin == badger2040.BUTTON_UP:
|
||||||
|
if state["page"] > 0:
|
||||||
|
state["page"] -= 1
|
||||||
|
render()
|
||||||
|
if pin == badger2040.BUTTON_DOWN:
|
||||||
|
if state["page"] < MAX_PAGE - 1:
|
||||||
|
state["page"] += 1
|
||||||
|
render()
|
||||||
|
|
||||||
|
|
||||||
|
if exited_to_launcher or not woken_by_button:
|
||||||
|
wait_for_user_to_release_buttons()
|
||||||
|
display.set_update_speed(badger2040.UPDATE_MEDIUM)
|
||||||
|
render()
|
||||||
|
|
||||||
|
display.set_update_speed(badger2040.UPDATE_FAST)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if display.pressed(badger2040.BUTTON_A):
|
||||||
|
button(badger2040.BUTTON_A)
|
||||||
|
if display.pressed(badger2040.BUTTON_B):
|
||||||
|
button(badger2040.BUTTON_B)
|
||||||
|
if display.pressed(badger2040.BUTTON_C):
|
||||||
|
button(badger2040.BUTTON_C)
|
||||||
|
|
||||||
|
if display.pressed(badger2040.BUTTON_UP):
|
||||||
|
button(badger2040.BUTTON_UP)
|
||||||
|
if display.pressed(badger2040.BUTTON_DOWN):
|
||||||
|
button(badger2040.BUTTON_DOWN)
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
badger_os.state_save("launcher", state)
|
||||||
|
changed = False
|
||||||
|
|
||||||
|
display.halt()
|
179
src/lib/badger2040w.py
Normal file
179
src/lib/badger2040w.py
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
import machine
|
||||||
|
import micropython
|
||||||
|
from picographics import PicoGraphics, DISPLAY_INKY_PACK
|
||||||
|
import network
|
||||||
|
from network_manager import NetworkManager
|
||||||
|
import WIFI_CONFIG
|
||||||
|
import uasyncio
|
||||||
|
import time
|
||||||
|
import gc
|
||||||
|
import wakeup
|
||||||
|
|
||||||
|
|
||||||
|
BUTTON_DOWN = 11
|
||||||
|
BUTTON_A = 12
|
||||||
|
BUTTON_B = 13
|
||||||
|
BUTTON_C = 14
|
||||||
|
BUTTON_UP = 15
|
||||||
|
BUTTON_USER = None # User button not available on W
|
||||||
|
|
||||||
|
BUTTON_MASK = 0b11111 << 11
|
||||||
|
|
||||||
|
SYSTEM_VERY_SLOW = 0
|
||||||
|
SYSTEM_SLOW = 1
|
||||||
|
SYSTEM_NORMAL = 2
|
||||||
|
SYSTEM_FAST = 3
|
||||||
|
SYSTEM_TURBO = 4
|
||||||
|
|
||||||
|
UPDATE_NORMAL = 0
|
||||||
|
UPDATE_MEDIUM = 1
|
||||||
|
UPDATE_FAST = 2
|
||||||
|
UPDATE_TURBO = 3
|
||||||
|
|
||||||
|
LED = 22
|
||||||
|
ENABLE_3V3 = 10
|
||||||
|
BUSY = 26
|
||||||
|
|
||||||
|
WIDTH = 296
|
||||||
|
HEIGHT = 128
|
||||||
|
|
||||||
|
SYSTEM_FREQS = [4000000, 12000000, 48000000, 133000000, 250000000]
|
||||||
|
|
||||||
|
BUTTONS = {
|
||||||
|
BUTTON_DOWN: machine.Pin(BUTTON_DOWN, machine.Pin.IN, machine.Pin.PULL_DOWN),
|
||||||
|
BUTTON_A: machine.Pin(BUTTON_A, machine.Pin.IN, machine.Pin.PULL_DOWN),
|
||||||
|
BUTTON_B: machine.Pin(BUTTON_B, machine.Pin.IN, machine.Pin.PULL_DOWN),
|
||||||
|
BUTTON_C: machine.Pin(BUTTON_C, machine.Pin.IN, machine.Pin.PULL_DOWN),
|
||||||
|
BUTTON_UP: machine.Pin(BUTTON_UP, machine.Pin.IN, machine.Pin.PULL_DOWN),
|
||||||
|
}
|
||||||
|
|
||||||
|
WAKEUP_MASK = 0
|
||||||
|
|
||||||
|
|
||||||
|
def woken_by_button():
|
||||||
|
return wakeup.get_gpio_state() & BUTTON_MASK > 0
|
||||||
|
|
||||||
|
|
||||||
|
def pressed_to_wake(button):
|
||||||
|
return wakeup.get_gpio_state() & (1 << button) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def reset_pressed_to_wake():
|
||||||
|
wakeup.reset_gpio_state()
|
||||||
|
|
||||||
|
|
||||||
|
def pressed_to_wake_get_once(button):
|
||||||
|
global WAKEUP_MASK
|
||||||
|
result = (wakeup.get_gpio_state() & ~WAKEUP_MASK & (1 << button)) > 0
|
||||||
|
WAKEUP_MASK |= 1 << button
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def system_speed(speed):
|
||||||
|
try:
|
||||||
|
machine.freq(SYSTEM_FREQS[speed])
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Badger2040W:
|
||||||
|
def __init__(self):
|
||||||
|
self.display = PicoGraphics(DISPLAY_INKY_PACK)
|
||||||
|
self._led = machine.PWM(machine.Pin(LED))
|
||||||
|
self._led.freq(1000)
|
||||||
|
self._led.duty_u16(0)
|
||||||
|
self._update_speed = 0
|
||||||
|
|
||||||
|
def __getattr__(self, item):
|
||||||
|
# Glue to redirect calls to PicoGraphics
|
||||||
|
return getattr(self.display, item)
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
t_start = time.ticks_ms()
|
||||||
|
self.display.update()
|
||||||
|
t_elapsed = time.ticks_ms() - t_start
|
||||||
|
|
||||||
|
delay_ms = [4700, 2600, 900, 250][self._update_speed]
|
||||||
|
|
||||||
|
if t_elapsed < delay_ms:
|
||||||
|
time.sleep((delay_ms - t_elapsed) / 1000)
|
||||||
|
|
||||||
|
def set_update_speed(self, speed):
|
||||||
|
self.display.set_update_speed(speed)
|
||||||
|
self._update_speed = speed
|
||||||
|
|
||||||
|
def led(self, brightness):
|
||||||
|
brightness = max(0, min(255, brightness))
|
||||||
|
self._led.duty_u16(int(brightness * 256))
|
||||||
|
|
||||||
|
def invert(self, invert):
|
||||||
|
raise RuntimeError("Display invert not supported in PicoGraphics.")
|
||||||
|
|
||||||
|
def thickness(self, thickness):
|
||||||
|
raise RuntimeError("Thickness not supported in PicoGraphics.")
|
||||||
|
|
||||||
|
def halt(self):
|
||||||
|
time.sleep(0.05)
|
||||||
|
enable = machine.Pin(ENABLE_3V3, machine.Pin.OUT)
|
||||||
|
enable.off()
|
||||||
|
while not self.pressed_any():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def pressed(self, button):
|
||||||
|
return BUTTONS[button].value() == 1 or pressed_to_wake_get_once(button)
|
||||||
|
|
||||||
|
def pressed_any(self):
|
||||||
|
for button in BUTTONS.values():
|
||||||
|
if button.value():
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
@micropython.native
|
||||||
|
def icon(self, data, index, data_w, icon_size, x, y):
|
||||||
|
s_x = (index * icon_size) % data_w
|
||||||
|
s_y = int((index * icon_size) / data_w)
|
||||||
|
|
||||||
|
for o_y in range(icon_size):
|
||||||
|
for o_x in range(icon_size):
|
||||||
|
o = ((o_y + s_y) * data_w) + (o_x + s_x)
|
||||||
|
bm = 0b10000000 >> (o & 0b111)
|
||||||
|
if data[o >> 3] & bm:
|
||||||
|
self.display.pixel(x + o_x, y + o_y)
|
||||||
|
|
||||||
|
def image(self, data, w, h, x, y):
|
||||||
|
for oy in range(h):
|
||||||
|
row = data[oy]
|
||||||
|
for ox in range(w):
|
||||||
|
if row & 0b1 == 0:
|
||||||
|
self.display.pixel(x + ox, y + oy)
|
||||||
|
row >>= 1
|
||||||
|
|
||||||
|
def status_handler(self, mode, status, ip):
|
||||||
|
print(mode, status, ip)
|
||||||
|
self.display.set_pen(15)
|
||||||
|
self.display.clear()
|
||||||
|
self.display.set_pen(0)
|
||||||
|
if status:
|
||||||
|
self.display.text("Connected!", 10, 10, 300, 0.5)
|
||||||
|
self.display.text(ip, 10, 30, 300, 0.5)
|
||||||
|
else:
|
||||||
|
self.display.text("Connecting...", 10, 10, 300, 0.5)
|
||||||
|
self.update()
|
||||||
|
|
||||||
|
def isconnected(self):
|
||||||
|
return network.WLAN(network.STA_IF).isconnected()
|
||||||
|
|
||||||
|
def ip_address(self):
|
||||||
|
return network.WLAN(network.STA_IF).ifconfig()[0]
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
if WIFI_CONFIG.COUNTRY == "":
|
||||||
|
raise RuntimeError("You must populate WIFI_CONFIG.py for networking.")
|
||||||
|
self.display.set_update_speed(2)
|
||||||
|
network_manager = NetworkManager(
|
||||||
|
WIFI_CONFIG.COUNTRY, status_handler=self.status_handler
|
||||||
|
)
|
||||||
|
uasyncio.get_event_loop().run_until_complete(
|
||||||
|
network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK)
|
||||||
|
)
|
||||||
|
gc.collect()
|
213
src/lib/badger_os.py
Normal file
213
src/lib/badger_os.py
Normal file
|
@ -0,0 +1,213 @@
|
||||||
|
"""Keep track of app state in persistent flash storage."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import gc
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import machine
|
||||||
|
import badger2040w as badger2040
|
||||||
|
|
||||||
|
|
||||||
|
def get_battery_level():
|
||||||
|
return 0
|
||||||
|
# Battery measurement
|
||||||
|
vbat_adc = machine.ADC(badger2040.PIN_BATTERY)
|
||||||
|
vref_adc = machine.ADC(badger2040.PIN_1V2_REF)
|
||||||
|
vref_en = machine.Pin(badger2040.PIN_VREF_POWER)
|
||||||
|
vref_en.init(machine.Pin.OUT)
|
||||||
|
vref_en.value(0)
|
||||||
|
|
||||||
|
# Enable the onboard voltage reference
|
||||||
|
vref_en.value(1)
|
||||||
|
|
||||||
|
# Calculate the logic supply voltage, as will be lower that the usual 3.3V when running off low batteries
|
||||||
|
vdd = 1.24 * (65535 / vref_adc.read_u16())
|
||||||
|
vbat = (
|
||||||
|
(vbat_adc.read_u16() / 65535) * 3 * vdd
|
||||||
|
) # 3 in this is a gain, not rounding of 3.3V
|
||||||
|
|
||||||
|
# Disable the onboard voltage reference
|
||||||
|
vref_en.value(0)
|
||||||
|
|
||||||
|
# Convert the voltage to a level to display onscreen
|
||||||
|
return vbat
|
||||||
|
|
||||||
|
|
||||||
|
def get_disk_usage():
|
||||||
|
# f_bfree and f_bavail should be the same?
|
||||||
|
# f_files, f_ffree, f_favail and f_flag are unsupported.
|
||||||
|
f_bsize, f_frsize, f_blocks, f_bfree, _, _, _, _, _, f_namemax = os.statvfs("/")
|
||||||
|
|
||||||
|
f_total_size = f_frsize * f_blocks
|
||||||
|
f_total_free = f_bsize * f_bfree
|
||||||
|
f_total_used = f_total_size - f_total_free
|
||||||
|
|
||||||
|
f_used = 100 / f_total_size * f_total_used
|
||||||
|
f_free = 100 / f_total_size * f_total_free
|
||||||
|
|
||||||
|
return f_total_size, f_used, f_free
|
||||||
|
|
||||||
|
|
||||||
|
def state_running():
|
||||||
|
state = {"running": "launcher"}
|
||||||
|
state_load("launcher", state)
|
||||||
|
return state["running"]
|
||||||
|
|
||||||
|
|
||||||
|
def state_clear_running():
|
||||||
|
running = state_running()
|
||||||
|
state_modify("launcher", {"running": "launcher"})
|
||||||
|
return running != "launcher"
|
||||||
|
|
||||||
|
|
||||||
|
def state_set_running(app):
|
||||||
|
state_modify("launcher", {"running": app})
|
||||||
|
|
||||||
|
|
||||||
|
def state_launch():
|
||||||
|
app = state_running()
|
||||||
|
if app is not None and app != "launcher":
|
||||||
|
launch(app)
|
||||||
|
|
||||||
|
|
||||||
|
def state_delete(app):
|
||||||
|
try:
|
||||||
|
os.remove("/state/{}.json".format(app))
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def state_save(app, data):
|
||||||
|
try:
|
||||||
|
with open("/state/{}.json".format(app), "w") as f:
|
||||||
|
f.write(json.dumps(data))
|
||||||
|
f.flush()
|
||||||
|
except OSError:
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.stat("/state")
|
||||||
|
except OSError:
|
||||||
|
os.mkdir("/state")
|
||||||
|
state_save(app, data)
|
||||||
|
|
||||||
|
|
||||||
|
def state_modify(app, data):
|
||||||
|
state = {}
|
||||||
|
state_load(app, state)
|
||||||
|
state.update(data)
|
||||||
|
state_save(app, state)
|
||||||
|
|
||||||
|
|
||||||
|
def state_load(app, defaults):
|
||||||
|
try:
|
||||||
|
data = json.loads(open("/state/{}.json".format(app), "r").read())
|
||||||
|
if type(data) is dict:
|
||||||
|
defaults.update(data)
|
||||||
|
return True
|
||||||
|
except (OSError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
state_save(app, defaults)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def launch(file):
|
||||||
|
state_set_running(file)
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
button_a = machine.Pin(badger2040.BUTTON_A, machine.Pin.IN, machine.Pin.PULL_DOWN)
|
||||||
|
button_c = machine.Pin(badger2040.BUTTON_C, machine.Pin.IN, machine.Pin.PULL_DOWN)
|
||||||
|
|
||||||
|
def quit_to_launcher(pin):
|
||||||
|
if button_a.value() and button_c.value():
|
||||||
|
machine.reset()
|
||||||
|
|
||||||
|
button_a.irq(trigger=machine.Pin.IRQ_RISING, handler=quit_to_launcher)
|
||||||
|
button_c.irq(trigger=machine.Pin.IRQ_RISING, handler=quit_to_launcher)
|
||||||
|
|
||||||
|
try:
|
||||||
|
__import__(file)
|
||||||
|
|
||||||
|
except ImportError as e:
|
||||||
|
# If the app doesn't exist, notify the user
|
||||||
|
warning(None, f"Could not launch: {file}")
|
||||||
|
print(e)
|
||||||
|
time.sleep(4.0)
|
||||||
|
except Exception as e:
|
||||||
|
# If the app throws an error, catch it and display!
|
||||||
|
print(e)
|
||||||
|
warning(None, str(e))
|
||||||
|
time.sleep(4.0)
|
||||||
|
|
||||||
|
# If the app exits or errors, do not relaunch!
|
||||||
|
state_clear_running()
|
||||||
|
machine.reset() # Exit back to launcher
|
||||||
|
|
||||||
|
|
||||||
|
# Draw an overlay box with a given message within it
|
||||||
|
def warning(
|
||||||
|
display,
|
||||||
|
message,
|
||||||
|
width=badger2040.WIDTH - 20,
|
||||||
|
height=badger2040.HEIGHT - 20,
|
||||||
|
line_spacing=20,
|
||||||
|
text_size=0.6,
|
||||||
|
):
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
if display is None:
|
||||||
|
display = badger2040.Badger2040W()
|
||||||
|
display.led(128)
|
||||||
|
|
||||||
|
# Draw a light grey background
|
||||||
|
display.set_pen(12)
|
||||||
|
display.rectangle(
|
||||||
|
(badger2040.WIDTH - width) // 2,
|
||||||
|
(badger2040.HEIGHT - height) // 2,
|
||||||
|
width,
|
||||||
|
height,
|
||||||
|
)
|
||||||
|
|
||||||
|
width -= 20
|
||||||
|
height -= 20
|
||||||
|
|
||||||
|
display.set_pen(15)
|
||||||
|
display.rectangle(
|
||||||
|
(badger2040.WIDTH - width) // 2,
|
||||||
|
(badger2040.HEIGHT - height) // 2,
|
||||||
|
width,
|
||||||
|
height,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Take the provided message and split it up into
|
||||||
|
# lines that fit within the specified width
|
||||||
|
words = message.split(" ")
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
current_line = ""
|
||||||
|
for word in words:
|
||||||
|
if display.measure_text(current_line + word + " ", text_size) < width:
|
||||||
|
current_line += word + " "
|
||||||
|
else:
|
||||||
|
lines.append(current_line.strip())
|
||||||
|
current_line = word + " "
|
||||||
|
lines.append(current_line.strip())
|
||||||
|
|
||||||
|
display.set_pen(0)
|
||||||
|
|
||||||
|
# Display each line of text from the message, centre-aligned
|
||||||
|
num_lines = len(lines)
|
||||||
|
for i in range(num_lines):
|
||||||
|
length = display.measure_text(lines[i], text_size)
|
||||||
|
current_line = (i * line_spacing) - ((num_lines - 1) * line_spacing) // 2
|
||||||
|
display.text(
|
||||||
|
lines[i],
|
||||||
|
(badger2040.WIDTH - length) // 2,
|
||||||
|
(badger2040.HEIGHT // 2) + current_line,
|
||||||
|
badger2040.WIDTH,
|
||||||
|
text_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
display.update()
|
117
src/lib/network_manager.py
Normal file
117
src/lib/network_manager.py
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
import rp2
|
||||||
|
import network
|
||||||
|
import machine
|
||||||
|
import uasyncio
|
||||||
|
|
||||||
|
|
||||||
|
class NetworkManager:
|
||||||
|
_ifname = ("Client", "Access Point")
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
country="GB",
|
||||||
|
client_timeout=30,
|
||||||
|
access_point_timeout=5,
|
||||||
|
status_handler=None,
|
||||||
|
error_handler=None,
|
||||||
|
):
|
||||||
|
rp2.country(country)
|
||||||
|
self._ap_if = network.WLAN(network.AP_IF)
|
||||||
|
self._sta_if = network.WLAN(network.STA_IF)
|
||||||
|
|
||||||
|
self._mode = network.STA_IF
|
||||||
|
self._client_timeout = client_timeout
|
||||||
|
self._access_point_timeout = access_point_timeout
|
||||||
|
self._status_handler = status_handler
|
||||||
|
self._error_handler = error_handler
|
||||||
|
self.UID = ("{:02X}" * 8).format(*machine.unique_id())
|
||||||
|
|
||||||
|
def isconnected(self):
|
||||||
|
return self._sta_if.isconnected() or self._ap_if.isconnected()
|
||||||
|
|
||||||
|
def config(self, var):
|
||||||
|
if self._sta_if.active():
|
||||||
|
return self._sta_if.config(var)
|
||||||
|
else:
|
||||||
|
if var == "password":
|
||||||
|
return self.UID
|
||||||
|
return self._ap_if.config(var)
|
||||||
|
|
||||||
|
def mode(self):
|
||||||
|
if self._sta_if.isconnected():
|
||||||
|
return self._ifname[0]
|
||||||
|
if self._ap_if.isconnected():
|
||||||
|
return self._ifname[1]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def ifaddress(self):
|
||||||
|
if self._sta_if.isconnected():
|
||||||
|
return self._sta_if.ifconfig()[0]
|
||||||
|
if self._ap_if.isconnected():
|
||||||
|
return self._ap_if.ifconfig()[0]
|
||||||
|
return "0.0.0.0"
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
if self._sta_if.isconnected():
|
||||||
|
self._sta_if.disconnect()
|
||||||
|
if self._ap_if.isconnected():
|
||||||
|
self._ap_if.disconnect()
|
||||||
|
|
||||||
|
async def wait(self, mode):
|
||||||
|
while not self.isconnected():
|
||||||
|
self._handle_status(mode, None)
|
||||||
|
await uasyncio.sleep_ms(1000)
|
||||||
|
|
||||||
|
def _handle_status(self, mode, status):
|
||||||
|
if callable(self._status_handler):
|
||||||
|
self._status_handler(self._ifname[mode], status, self.ifaddress())
|
||||||
|
|
||||||
|
def _handle_error(self, mode, msg):
|
||||||
|
if callable(self._error_handler):
|
||||||
|
if self._error_handler(self._ifname[mode], msg):
|
||||||
|
return
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
|
async def client(self, ssid, psk):
|
||||||
|
if self._sta_if.isconnected():
|
||||||
|
self._handle_status(network.STA_IF, True)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._ap_if.disconnect()
|
||||||
|
self._ap_if.active(False)
|
||||||
|
|
||||||
|
self._sta_if.active(True)
|
||||||
|
self._sta_if.connect(ssid, psk)
|
||||||
|
self._sta_if.config(pm=0xA11140)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout)
|
||||||
|
self._handle_status(network.STA_IF, True)
|
||||||
|
|
||||||
|
except uasyncio.TimeoutError:
|
||||||
|
self._sta_if.active(False)
|
||||||
|
self._handle_status(network.STA_IF, False)
|
||||||
|
self._handle_error(network.STA_IF, "WIFI Client Failed")
|
||||||
|
|
||||||
|
async def access_point(self):
|
||||||
|
if self._ap_if.isconnected():
|
||||||
|
self._handle_status(network.AP_IF, True)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._sta_if.disconnect()
|
||||||
|
self._sta_if.active(False)
|
||||||
|
|
||||||
|
self._ap_if.ifconfig(("10.10.1.1", "255.255.255.0", "10.10.1.1", "10.10.1.1"))
|
||||||
|
self._ap_if.config(password=self.UID)
|
||||||
|
self._ap_if.active(True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await uasyncio.wait_for(
|
||||||
|
self.wait(network.AP_IF), self._access_point_timeout
|
||||||
|
)
|
||||||
|
self._handle_status(network.AP_IF, True)
|
||||||
|
|
||||||
|
except uasyncio.TimeoutError:
|
||||||
|
self._sta_if.active(False)
|
||||||
|
self._handle_status(network.AP_IF, False)
|
||||||
|
self._handle_error(network.AP_IF, "WIFI Client Failed")
|
23
src/main.py
23
src/main.py
|
@ -1,22 +1 @@
|
||||||
from picographics import PicoGraphics, DISPLAY_INKY_PACK
|
import launcher # noqa: F401
|
||||||
|
|
||||||
import wifi
|
|
||||||
|
|
||||||
|
|
||||||
PEN_BLACK = 0
|
|
||||||
PEN_WHITE = 15
|
|
||||||
|
|
||||||
display = PicoGraphics(DISPLAY_INKY_PACK)
|
|
||||||
display.set_pen(PEN_WHITE)
|
|
||||||
display.clear()
|
|
||||||
display.set_pen(PEN_BLACK)
|
|
||||||
display.text("Connecting...", 10, 10)
|
|
||||||
display.update()
|
|
||||||
|
|
||||||
wifi.setup()
|
|
||||||
display.set_pen(PEN_WHITE)
|
|
||||||
display.clear()
|
|
||||||
display.set_pen(PEN_BLACK)
|
|
||||||
display.text("Connected!", 10, 10)
|
|
||||||
display.update()
|
|
||||||
|
|
||||||
|
|
74
tasks.py
Normal file
74
tasks.py
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from invoke import task, Context
|
||||||
|
|
||||||
|
BASE_DIR = Path(__file__).parent.resolve(strict=True)
|
||||||
|
SRC_DIR = BASE_DIR / "src"
|
||||||
|
|
||||||
|
MICROPYTHON_DEPENDENCIES = [
|
||||||
|
# "github:miguelgrinberg/microdot/src/microdot.py",
|
||||||
|
# "github:miguelgrinberg/microdot/src/microdot_asyncio.py",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def wipe(c: Context, board_id: str):
|
||||||
|
"""Wipe the board with mpremote."""
|
||||||
|
c.run(
|
||||||
|
f'mpremote connect id:{board_id} exec --no-follow "'
|
||||||
|
"import os, machine, rp2;"
|
||||||
|
"os.umount('/');"
|
||||||
|
"bdev = rp2.Flash();"
|
||||||
|
"os.VfsLfs2.mkfs(bdev, progsize=256);"
|
||||||
|
"vfs = os.VfsLfs2(bdev, progsize=256);"
|
||||||
|
"os.mount(vfs, '/');"
|
||||||
|
'machine.reset()"',
|
||||||
|
pty=True,
|
||||||
|
echo=True,
|
||||||
|
)
|
||||||
|
print("Board wiped, waiting for it to reboot...")
|
||||||
|
time.sleep(3)
|
||||||
|
print("Done!")
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def list(c: Context):
|
||||||
|
"""List connected boards with mpremote."""
|
||||||
|
c.run("mpremote devs", pty=True, echo=True)
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def initial_setup(c: Context, board_id: str):
|
||||||
|
"""Install dependencies and copy project files to the board."""
|
||||||
|
with c.cd(SRC_DIR):
|
||||||
|
if MICROPYTHON_DEPENDENCIES:
|
||||||
|
deps = " ".join(MICROPYTHON_DEPENDENCIES)
|
||||||
|
c.run(
|
||||||
|
f"mpremote connect id:{board_id} "
|
||||||
|
f"mip install {deps} + "
|
||||||
|
"cp -r . : + "
|
||||||
|
"reset",
|
||||||
|
pty=True,
|
||||||
|
echo=True,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
c.run(
|
||||||
|
f"mpremote connect id:{board_id} " "cp -r . : + " "reset",
|
||||||
|
pty=True,
|
||||||
|
echo=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@task
|
||||||
|
def update_code(c: Context, board_id: str):
|
||||||
|
"""Update code on the board."""
|
||||||
|
# mpremote connect id:e6614864d3269c34 \
|
||||||
|
# cp -r . : + \
|
||||||
|
# reset
|
||||||
|
with c.cd(SRC_DIR):
|
||||||
|
c.run(
|
||||||
|
f"mpremote connect id:{board_id} " "cp -r . : + " "reset",
|
||||||
|
pty=True,
|
||||||
|
echo=True,
|
||||||
|
)
|
Loading…
Reference in a new issue