Release v3.0.0

- Bump version to 3.0.0 and update docs

- Fix Debian payload to include TUI and install /usr/bin/sensorpajen-tui wrapper

- Make systemd unit upgrades safer and ignore deb build artifacts
This commit is contained in:
2025-12-29 15:34:03 +01:00
parent 54d55cf0f6
commit 4f14af39bf
50 changed files with 963 additions and 2421 deletions

21
.gitignore vendored
View File

@@ -1,8 +1,25 @@
.*
__pycache__
temp
__pycache__/
temp/
*.db
*.egg-info/
.venv/
build/
dist/
# Local configuration (do not commit secrets or device-specific state)
config/sensorpajen.env
config/sensors.json
config/discovered_sensors.json
# Packaging build artifacts
debian/.debhelper/
debian/*.debhelper.log
debian/*.log
debian/*.substvars
debian/debhelper-build-stamp
debian/files
debian/sensorpajen/
# Local experiments
test-local-tui/

View File

@@ -41,3 +41,4 @@ This roadmap defines the evolution of Sensorpajen from a CLI-based tool to a ful
-**Phase 0: Preparation & Cleanup** (2025-12-27)
-**Phase 0.1: Testing Infrastructure** (2025-12-29)
-**Phase 1: Modern TUI Management & Data Persistence** (2025-12-29)
-**Release: v3 Debian package** (2025-12-29)

View File

@@ -1,5 +1,14 @@
# Tasks
## Release: v3.0.0 ✅ DONE (2025-12-29)
**Goal**: Publish a v3 release and Debian package suitable for upgrades.
### Completed:
- ✅ Bump versions to 3.0.0 (Python + Debian changelog)
- ✅ Ensure Debian package includes the TUI sources
- ✅ Build `sensorpajen_3.0.0_all.deb`
## Task: TUI Enhancements (Phase 2)
**Goal**: Add live data, global config, and dashboard.

View File

@@ -1 +1 @@
2.0.0
3.0.0

19
debian/README.md vendored
View File

@@ -63,10 +63,10 @@ ls -lh ../sensorpajen_*.deb
## Build Output
```
../sensorpajen_2.0.0-dev_all.deb # Installable package
../sensorpajen_2.0.0-dev_armhf.build # Build log
../sensorpajen_2.0.0-dev_armhf.buildinfo # Build metadata
../sensorpajen_2.0.0-dev_armhf.changes # Changes file
../sensorpajen_3.0.0_all.deb # Installable package
../sensorpajen_3.0.0_armhf.build # Build log
../sensorpajen_3.0.0_armhf.buildinfo # Build metadata
../sensorpajen_3.0.0_armhf.changes # Changes file
```
## Package Verification
@@ -135,6 +135,16 @@ sudo systemctl status sensorpajen
sudo journalctl -u sensorpajen -f
```
## Running the TUI
The package installs a `sensorpajen-tui` command in `/usr/bin/`.
```bash
sudo sensorpajen-tui
```
Internally this runs the application from `/opt/sensorpajen/venv/`.
## Package Structure
### Installed Files
@@ -142,6 +152,7 @@ sudo journalctl -u sensorpajen -f
| Source | Destination |
|--------|-------------|
| `src/sensorpajen/*.py` | `/opt/sensorpajen/src/sensorpajen/` |
| `src/sensorpajen/tui/*.py` | `/opt/sensorpajen/src/sensorpajen/tui/` |
| `scripts/approve-sensors.sh` | `/opt/sensorpajen/scripts/` |
| `pyproject.toml` | `/opt/sensorpajen/` |
| `README.md`, `INSTALL.md`, `ROADMAP.md` | `/usr/share/doc/sensorpajen/` |

9
debian/changelog vendored
View File

@@ -1,3 +1,12 @@
sensorpajen (3.0.0) stable; urgency=medium
* Production release v3.0.0
* Textual TUI for sensor approval/management
* Per-sensor comments persisted in sensors.json
* Improved safety UX (delete confirmation, details view)
-- Fredrik <fredrik@wahlberg.se> Mon, 29 Dec 2025 12:00:00 +0100
sensorpajen (2.0.0) stable; urgency=medium
* Production release v2.0.0

View File

@@ -1 +0,0 @@
sensorpajen

1
debian/files vendored
View File

@@ -1 +0,0 @@
sensorpajen_2.0.0-dev_all.deb misc optional

2
debian/install vendored
View File

@@ -1,5 +1,7 @@
src/sensorpajen/*.py opt/sensorpajen/src/sensorpajen/
src/sensorpajen/tui/*.py opt/sensorpajen/src/sensorpajen/tui/
scripts/approve-sensors.sh opt/sensorpajen/scripts/
debian/sensorpajen-tui usr/bin/
pyproject.toml opt/sensorpajen/
requirements.txt opt/sensorpajen/
readme.md usr/share/doc/sensorpajen/

20
debian/postinst vendored
View File

@@ -87,11 +87,21 @@ case "$1" in
echo "Warning: setcap not found (install libcap2-bin package)"
fi
# Install systemd service file
if [ -f /opt/sensorpajen/debian/sensorpajen.service ]; then
cp /opt/sensorpajen/debian/sensorpajen.service /etc/systemd/system/
elif [ -f /usr/share/doc/sensorpajen/sensorpajen.service ]; then
cp /usr/share/doc/sensorpajen/sensorpajen.service /etc/systemd/system/
# v2 installed a unit into /etc/systemd/system/, which overrides packaged units
# and prevents upgrades from taking effect. If that file exists and is identical
# to the packaged unit, remove the override.
if [ -f /etc/systemd/system/sensorpajen.service ]; then
PACKAGED_UNIT=""
if [ -f /lib/systemd/system/sensorpajen.service ]; then
PACKAGED_UNIT="/lib/systemd/system/sensorpajen.service"
elif [ -f /usr/lib/systemd/system/sensorpajen.service ]; then
PACKAGED_UNIT="/usr/lib/systemd/system/sensorpajen.service"
fi
if [ -n "$PACKAGED_UNIT" ] && diff -q /etc/systemd/system/sensorpajen.service "$PACKAGED_UNIT" >/dev/null 2>&1; then
rm -f /etc/systemd/system/sensorpajen.service
echo "Removed redundant /etc override unit (upgrade-safe)"
fi
fi
# Reload systemd

12
debian/sensorpajen-tui vendored Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/sh
set -eu
# Wrapper to run the installed TUI using the app's virtualenv.
# The venv is created/updated by the package postinst.
if [ -x /opt/sensorpajen/venv/bin/sensorpajen-tui ]; then
exec /opt/sensorpajen/venv/bin/sensorpajen-tui "$@"
fi
# Fallback (should normally not be needed)
exec /opt/sensorpajen/venv/bin/python -m sensorpajen.tui.app "$@"

View File

@@ -1,12 +0,0 @@
# Automatically added by dh_installsystemd/13.14.1ubuntu5
if [ "$1" = remove ] && [ -d /run/systemd/system ] ; then
systemctl --system daemon-reload >/dev/null || true
fi
# End automatically added section
# Automatically added by dh_installsystemd/13.14.1ubuntu5
if [ "$1" = "purge" ]; then
if [ -x "/usr/bin/deb-systemd-helper" ]; then
deb-systemd-helper purge 'sensorpajen.service' >/dev/null || true
fi
fi
# End automatically added section

View File

@@ -1,2 +0,0 @@
misc:Depends=
misc:Pre-Depends=

View File

@@ -1,20 +0,0 @@
Package: sensorpajen
Version: 2.0.0-dev
Architecture: all
Maintainer: Fredrik <fredrik@wahlberg.se>
Installed-Size: 112
Depends: python3 (>= 3.9), python3-venv, python3-pip, bluetooth, bluez, libcap2-bin
Recommends: mosquitto-clients
Section: misc
Priority: optional
Homepage: https://github.com/yourusername/sensorpajen
Description: Raspberry Pi Bluetooth temperature sensor monitor
Monitors Xiaomi Mijia LYWSD03MMC temperature sensors via Bluetooth Low Energy
and publishes readings to MQTT broker. Supports ATC firmware with automatic
sensor discovery and approval workflow.
.
Features:
- Automatic sensor discovery
- MQTT publishing
- Systemd service integration
- User approval workflow for new sensors

View File

@@ -1,19 +0,0 @@
3b3c15c00bf48fc519b8fbe507a93a7e opt/sensorpajen/pyproject.toml
0894789523a53bb372980c0906a7d0b5 opt/sensorpajen/requirements.txt
940d73f24eb9f971ce27f9355e3072f3 opt/sensorpajen/scripts/approve-sensors.sh
20eb4f3839b990a530410768897402c0 opt/sensorpajen/src/sensorpajen/__init__.py
3c6c65213de874065f81b7b3d8948c8b opt/sensorpajen/src/sensorpajen/approve_sensors.py
f69225e19918cca05351fa2da8fd7618 opt/sensorpajen/src/sensorpajen/config.py
65c63383dde4f0b249b708f854ec75a3 opt/sensorpajen/src/sensorpajen/discovery_manager.py
7604c2bc0a854d6d43ff0f0646386fc5 opt/sensorpajen/src/sensorpajen/main.py
331bf9b314492acc6ce03896367f3cf6 opt/sensorpajen/src/sensorpajen/mqtt_publisher.py
5f4ea191e35ce092f39ec0a4f663cb38 opt/sensorpajen/src/sensorpajen/sensor_reader.py
c8dd8fe8fc174a9cd35251fdf80e7b5f opt/sensorpajen/src/sensorpajen/utils.py
b9ad3ea8307d8ed8e938da37ad00f229 usr/lib/systemd/system/sensorpajen.service
4ddb9618c940286f91df901ec818959a usr/share/doc/sensorpajen/INSTALL.md.gz
bd2f1371c60af415bc9d0dbc1111184d usr/share/doc/sensorpajen/ROADMAP.md.gz
380e8e6b01b757ceac05bc5805844ae4 usr/share/doc/sensorpajen/changelog.Debian.gz
14152a98d7cd7fe8daf280aacc4cbf3f usr/share/doc/sensorpajen/examples/discovered_sensors.json.example
74c99b732363f93f0a1c134e1a8c3d35 usr/share/doc/sensorpajen/examples/sensorpajen.env.example
292efbddd951c39cb2c9546d5fac5e05 usr/share/doc/sensorpajen/examples/sensors.json.example
5f647c63bfc3b174611694779fd215e0 usr/share/doc/sensorpajen/readme.md.gz

View File

@@ -1,151 +0,0 @@
#!/bin/bash
set -e
case "$1" in
configure)
# Create sensorpajen system user if it doesn't exist
if ! getent passwd sensorpajen > /dev/null; then
useradd --system --no-create-home --shell /usr/sbin/nologin sensorpajen
echo "Created system user: sensorpajen"
fi
# Create config directory with proper permissions
mkdir -p /etc/sensorpajen
chown sensorpajen:sensorpajen /etc/sensorpajen
chmod 750 /etc/sensorpajen
# Create state directory with proper permissions (writable at runtime)
mkdir -p /var/lib/sensorpajen
chown sensorpajen:sensorpajen /var/lib/sensorpajen
chmod 750 /var/lib/sensorpajen
# Copy example configs to /etc/sensorpajen if they don't exist
for sample in sensorpajen.env.example sensors.json.example; do
source_file="/usr/share/doc/sensorpajen/examples/$sample"
target_file="/etc/sensorpajen/${sample%.example}"
if [ -f "$source_file" ] && [ ! -f "$target_file" ]; then
cp "$source_file" "$target_file"
chown sensorpajen:sensorpajen "$target_file"
# Set restrictive permissions on env file (contains credentials)
if [ "$sample" = "sensorpajen.env.example" ]; then
chmod 600 "$target_file"
echo "Created $target_file (edit this file with your MQTT credentials)"
else
chmod 640 "$target_file"
echo "Created $target_file"
fi
fi
done
# Create virtual environment in /opt/sensorpajen
cd /opt/sensorpajen
if [ ! -d "venv" ]; then
echo "Creating Python virtual environment..."
python3 -m venv venv
venv/bin/pip install --upgrade pip setuptools wheel
fi
# Install Python dependencies from requirements.txt
echo "Installing Python dependencies..."
if [ -f "/opt/sensorpajen/requirements.txt" ]; then
venv/bin/pip install -r /opt/sensorpajen/requirements.txt
else
echo "Warning: requirements.txt not found, installing bluepy and paho-mqtt directly"
venv/bin/pip install bluepy paho-mqtt pybluez
fi
if [ $? -ne 0 ]; then
echo "Error: Failed to install dependencies"
exit 1
fi
# Install sensorpajen package itself
echo "Installing sensorpajen application..."
cd /opt/sensorpajen
# Clean up any stale bytecode before building wheel
find . -name "*.pyc" -delete
find . -name "__pycache__" -type d -delete
venv/bin/pip install --no-deps . || {
echo "Error: Failed to install sensorpajen package"
exit 1
}
cd /
# Set ownership of application directory BEFORE setting capabilities
chown -R sensorpajen:sensorpajen /opt/sensorpajen
# Set Bluetooth capabilities on Python executable (after ownership change)
PYTHON_PATH=$(readlink -f /opt/sensorpajen/venv/bin/python3)
if command -v setcap >/dev/null 2>&1; then
setcap cap_net_raw,cap_net_admin+eip "$PYTHON_PATH" || {
echo "Warning: setcap failed. You may need to run Bluetooth operations as root."
echo "Try: sudo setcap cap_net_raw,cap_net_admin+eip $PYTHON_PATH"
}
else
echo "Warning: setcap not found (install libcap2-bin package)"
fi
# Install systemd service file
if [ -f /opt/sensorpajen/debian/sensorpajen.service ]; then
cp /opt/sensorpajen/debian/sensorpajen.service /etc/systemd/system/
elif [ -f /usr/share/doc/sensorpajen/sensorpajen.service ]; then
cp /usr/share/doc/sensorpajen/sensorpajen.service /etc/systemd/system/
fi
# Reload systemd
systemctl daemon-reload
# Enable service (but don't start - needs configuration first)
systemctl enable sensorpajen.service || {
echo "Warning: Could not enable sensorpajen service"
}
# Check if configuration is ready
if [ -f /etc/sensorpajen/sensorpajen.env ] && [ -f /etc/sensorpajen/sensors.json ]; then
# Check if env file has been configured (not default values)
if grep -q "MQTT_HOST=192.168.0.114" /etc/sensorpajen/sensorpajen.env; then
echo ""
echo "======================================================================"
echo " Configuration needed!"
echo "======================================================================"
echo " Edit /etc/sensorpajen/sensorpajen.env with your MQTT settings"
echo " Edit /etc/sensorpajen/sensors.json with your sensor list"
echo " Then run: sudo systemctl start sensorpajen"
echo "======================================================================"
echo ""
else
# Configuration appears to be customized, restart service
systemctl restart sensorpajen.service && {
echo "Sensorpajen service started"
echo "View logs: sudo journalctl -u sensorpajen -f"
} || {
echo "Failed to start service. Check: sudo systemctl status sensorpajen"
}
fi
else
echo ""
echo "======================================================================"
echo " Sensorpajen installed successfully!"
echo "======================================================================"
echo " Next steps:"
echo " 1. Edit /etc/sensorpajen/sensorpajen.env"
echo " 2. Edit /etc/sensorpajen/sensors.json"
echo " 3. sudo systemctl start sensorpajen"
echo " 4. sudo journalctl -u sensorpajen -f"
echo "======================================================================"
echo ""
fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
;;
*)
echo "postinst called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

View File

@@ -1,41 +0,0 @@
#!/bin/bash
set -e
case "$1" in
remove)
# Service removed but config and user preserved
echo "Sensorpajen removed. Configuration preserved in /etc/sensorpajen/"
echo "To remove config: sudo rm -rf /etc/sensorpajen/"
# Remove systemd service file
rm -f /etc/systemd/system/sensorpajen.service
systemctl daemon-reload || true
;;
purge)
# Even on purge, we keep config by default (user can manually delete)
# This is safer as it prevents accidental data loss
echo "Configuration preserved in /etc/sensorpajen/"
echo "To remove config: sudo rm -rf /etc/sensorpajen/"
echo "To remove user: sudo userdel sensorpajen"
# Remove systemd service file
rm -f /etc/systemd/system/sensorpajen.service
systemctl daemon-reload || true
# Note: We intentionally do NOT remove:
# - /etc/sensorpajen (contains user data)
# - sensorpajen user (may own other files/processes)
# User must remove these manually if desired
;;
upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
;;
*)
echo "postrm called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

View File

@@ -1,27 +0,0 @@
#!/bin/bash
set -e
case "$1" in
remove|upgrade|deconfigure)
# Stop service before removal or upgrade
if systemctl is-active --quiet sensorpajen.service 2>/dev/null; then
echo "Stopping sensorpajen service..."
systemctl stop sensorpajen.service || true
fi
# Disable service on removal (not upgrade)
if [ "$1" = "remove" ]; then
systemctl disable sensorpajen.service || true
fi
;;
failed-upgrade)
;;
*)
echo "prerm called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

View File

@@ -1,65 +0,0 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "sensorpajen"
version = "2.0.0-dev"
description = "Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC"
readme = "README.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [
{name = "Fredrik", email = "your@email.com"}
]
keywords = ["bluetooth", "temperature", "sensor", "mqtt", "raspberry-pi"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Operating System :: POSIX :: Linux",
"Topic :: Home Automation",
]
dependencies = [
"pybluez>=0.31",
"bluepy>=1.3.0",
"paho-mqtt>=1.6.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"black>=23.0",
"ruff>=0.1.0",
]
[project.urls]
Homepage = "https://github.com/yourusername/sensorpajen"
Repository = "https://github.com/yourusername/sensorpajen"
[project.scripts]
sensorpajen = "sensorpajen.main:main"
sensorpajen-approve-sensors = "sensorpajen.approve_sensors:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.black]
line-length = 100
target-version = ["py39", "py310", "py311"]
[tool.ruff]
line-length = 100
target-version = "py39"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]

View File

@@ -1,3 +0,0 @@
pybluez
bluepy
paho-mqtt

View File

@@ -1,48 +0,0 @@
#!/bin/bash
# Wrapper script for approve-sensors that works in both dev and system mode
# Detect installation type
if [ -d "/opt/sensorpajen" ]; then
# System installation
PROJECT_ROOT="/opt/sensorpajen"
VENV_PATH="/opt/sensorpajen/venv"
# Load config from system location
if [ -f "/etc/sensorpajen/sensorpajen.env" ]; then
set -a
source /etc/sensorpajen/sensorpajen.env
set +a
else
echo "Warning: /etc/sensorpajen/sensorpajen.env not found"
# Set minimal defaults
export MQTT_HOST="${MQTT_HOST:-localhost}"
export MQTT_PORT="${MQTT_PORT:-1883}"
fi
else
# Development installation
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
PROJECT_ROOT="$( cd "$SCRIPT_DIR/.." && pwd )"
VENV_PATH="$PROJECT_ROOT/.venv"
# Set minimal required environment variables
export MQTT_HOST="${MQTT_HOST:-localhost}"
export MQTT_PORT="${MQTT_PORT:-1883}"
# Load actual config if it exists (will override defaults)
if [ -f "$PROJECT_ROOT/config/sensorpajen.env" ]; then
set -a
source "$PROJECT_ROOT/config/sensorpajen.env"
set +a
fi
fi
# Activate virtual environment
if [ -f "$VENV_PATH/bin/activate" ]; then
source "$VENV_PATH/bin/activate"
else
echo "Error: Virtual environment not found at $VENV_PATH"
exit 1
fi
# Run the approve-sensors command
python -m sensorpajen.approve_sensors "$@"

View File

@@ -1,10 +0,0 @@
"""
Sensorpajen - Bluetooth Temperature Sensor Monitor
Monitors Xiaomi Mijia LYWSD03MMC Bluetooth temperature sensors
and publishes data to MQTT broker.
"""
__version__ = "2.0.0-dev"
__author__ = "Fredrik"
__license__ = "MIT"

View File

@@ -1,305 +0,0 @@
#!/usr/bin/env python3
"""
CLI tool for approving or ignoring discovered sensors.
Interactive tool to manage pending and ignored sensors.
"""
import sys
import json
import logging
import argparse
from pathlib import Path
from typing import List
from . import config
from .discovery_manager import DiscoveryManager, DiscoveredSensor
logger = logging.getLogger(__name__)
def format_metadata_comment(sensor: DiscoveredSensor) -> str:
"""
Format sensor metadata as a comment string.
Args:
sensor: Discovered sensor
Returns:
Formatted comment string
"""
return (
f"MAC: {sensor.mac}, "
f"Name: {sensor.name}, "
f"Last seen: {sensor.last_seen}, "
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C, "
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%, "
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%"
)
def display_sensor(sensor: DiscoveredSensor, index: int, total: int):
"""
Display sensor information to the user.
Args:
sensor: Discovered sensor to display
index: Current sensor number (1-based)
total: Total number of sensors
"""
print(f"\n{'='*70}")
print(f"Sensor {index}/{total}")
print(f"{'='*70}")
print(f"MAC Address: {sensor.mac}")
print(f"Device Name: {sensor.name}")
print(f"Last Seen: {sensor.last_seen}")
print(f"Status: {sensor.status}")
if sensor.status == "ignored" and sensor.ignored_at:
print(f"Ignored At: {sensor.ignored_at}")
if sensor.ignore_reason:
print(f"Reason: {sensor.ignore_reason}")
# Display sample reading
reading = sensor.sample_reading
print(f"\nSample Reading:")
print(f" Temperature: {reading.get('temperature', 'N/A')}°C")
print(f" Humidity: {reading.get('humidity', 'N/A')}%")
print(f" Battery: {reading.get('battery_percent', 'N/A')}%")
print(f" Voltage: {reading.get('battery_voltage', 'N/A')}mV")
print(f"{'='*70}")
def get_user_choice() -> str:
"""
Get user's choice for what to do with the sensor.
Returns:
User choice: 'a' (approve), 'i' (ignore), 's' (skip)
"""
while True:
choice = input("\n[A]pprove, [I]gnore, [S]kip, [Q]uit? ").strip().lower()
if choice in ['a', 'i', 's', 'q']:
return choice
print("Invalid choice. Please enter A, I, S, or Q.")
def approve_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Approve a sensor and add it to sensors.json.
Args:
sensor: Sensor to approve
manager: Discovery manager
"""
# Check if sensor already exists in sensors.json
sensor_config_path = Path(config.SENSOR_CONFIG_FILE)
try:
with open(sensor_config_path, 'r') as f:
data = json.load(f)
# Check for duplicates
for existing_sensor in data.get('sensors', []):
if existing_sensor.get('mac', '').upper() == sensor.mac:
print(f"\n⚠️ Sensor {sensor.mac} already exists in sensors.json")
print(" Renaming must be done manually in the file.")
return
except FileNotFoundError:
# File doesn't exist yet, create with empty sensors list
data = {'sensors': []}
except json.JSONDecodeError as e:
print(f"\n❌ Error: Invalid JSON in {sensor_config_path}: {e}")
return
# Get sensor name from user
while True:
name = input("\nEnter sensor name (required): ").strip()
if name:
break
print("Sensor name cannot be empty.")
# Pre-fill comment with metadata
default_comment = format_metadata_comment(sensor)
print(f"\nDefault comment:")
print(f" {default_comment}")
edit = input("\nEdit comment? [y/N]: ").strip().lower()
if edit == 'y':
print("\nEnter comment (or press Enter to keep default):")
comment = input("> ").strip()
if not comment:
comment = default_comment
else:
comment = default_comment
# Add to sensors.json
new_sensor = {
"mac": sensor.mac,
"name": name
}
if comment:
new_sensor["comment"] = comment
data.setdefault('sensors', []).append(new_sensor)
try:
with open(sensor_config_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"\n✅ Sensor approved and added to sensors.json")
print(f" Name: {name}")
print(f" Configuration will be reloaded automatically within 15 minutes")
# Mark as approved in discovery manager and save
print(f"\nUpdating discovery status...")
manager.approve(sensor.mac)
print(f"✅ Marked as approved in discovered_sensors.json")
except Exception as e:
print(f"\n❌ Error saving to sensors.json: {e}")
def ignore_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Ignore a sensor.
Args:
sensor: Sensor to ignore
manager: Discovery manager
"""
reason = input("\nReason for ignoring (optional): ").strip()
manager.ignore(sensor.mac, reason if reason else None)
print(f"\n✅ Sensor ignored and marked in discovered_sensors.json")
if reason:
print(f" Reason: {reason}")
def process_sensors(sensors: List[DiscoveredSensor], manager: DiscoveryManager):
"""
Process list of sensors interactively.
Args:
sensors: List of sensors to process
manager: Discovery manager
"""
if not sensors:
print("\n✅ No sensors to process")
return
print(f"\nFound {len(sensors)} sensor(s) to review")
for i, sensor in enumerate(sensors, 1):
# Mark as reviewed when shown
manager.mark_reviewed(sensor.mac)
display_sensor(sensor, i, len(sensors))
choice = get_user_choice()
if choice == 'q':
print("\n👋 Exiting...")
break
elif choice == 'a':
approve_sensor(sensor, manager)
elif choice == 'i':
ignore_sensor(sensor, manager)
elif choice == 's':
print("\n⏭️ Skipped")
continue
def main():
"""Main entry point for approve-sensors CLI."""
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Approve or ignore discovered Bluetooth sensors",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Show only new pending sensors
%(prog)s --all # Show all pending sensors (including reviewed)
%(prog)s --ignored # Show only ignored sensors
%(prog)s --all --ignored # Show all sensors
"""
)
parser.add_argument(
'--all', '-a',
action='store_true',
help='Show all pending sensors, including previously reviewed ones'
)
parser.add_argument(
'--ignored', '-i',
action='store_true',
help='Show ignored sensors'
)
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.WARNING,
format='%(levelname)s: %(message)s'
)
print("=" * 70)
print("Sensorpajen - Approve Sensors")
print("=" * 70)
try:
# Load discovery manager
manager = DiscoveryManager()
# Get sensors based on flags
if args.all:
pending = manager.get_pending()
pending_label = "all pending"
else:
pending = manager.get_new_pending()
pending_label = "new pending"
ignored = manager.get_ignored() if args.ignored else []
if not pending and not ignored:
if args.all or args.ignored:
print(f"\n✅ No {pending_label if pending else 'ignored'} sensors found")
else:
print("\n✅ No new sensors to review")
all_pending = manager.get_pending()
if all_pending:
print(f"\nThere are {len(all_pending)} previously reviewed pending sensor(s).")
print("Run with --all to review them again.")
return 0
# Process pending sensors
if pending:
print(f"\n📋 Processing {len(pending)} {pending_label} sensor(s)...")
process_sensors(pending, manager)
# Process ignored sensors if requested
if ignored:
if pending:
print("\n" + "=" * 70)
print(f"\n📋 Processing {len(ignored)} ignored sensor(s)...")
process_sensors(ignored, manager)
print("\n" + "=" * 70)
print("Done!")
print("=" * 70)
return 0
except KeyboardInterrupt:
print("\n\n👋 Interrupted by user")
return 1
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,160 +0,0 @@
"""
Configuration management for Sensorpajen.
Loads configuration from environment variables with sensible defaults.
Configuration files are loaded relative to the project root.
"""
import os
import json
import logging
from pathlib import Path
from typing import Dict, List
logger = logging.getLogger(__name__)
# Determine project root and config directory
# Check if running from system installation (/opt/sensorpajen) or development
_opt_sensorpajen_exists = Path('/opt/sensorpajen').exists()
_var_lib_exists = Path('/var/lib/sensorpajen').exists()
if _opt_sensorpajen_exists:
# System installation
PROJECT_ROOT = Path('/opt/sensorpajen')
CONFIG_DIR = Path('/etc/sensorpajen')
STATE_DIR = Path('/var/lib/sensorpajen')
else:
# Development installation (3 levels up from this file: src/sensorpajen/config.py)
PROJECT_ROOT = Path(__file__).parent.parent.parent
CONFIG_DIR = PROJECT_ROOT / "config"
STATE_DIR = CONFIG_DIR
# MQTT Configuration from environment
MQTT_HOST = os.environ.get("MQTT_HOST")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD")
MQTT_CLIENT_ID = os.environ.get("MQTT_CLIENT_ID", "sensorpajen")
MQTT_TOPIC_PREFIX = os.environ.get("MQTT_TOPIC_PREFIX", "MiTemperature2")
# Validate required MQTT configuration
if not MQTT_HOST:
raise RuntimeError(
"MQTT_HOST environment variable must be set. "
"Please configure config/sensorpajen.env"
)
# Sensor configuration file
SENSOR_CONFIG_FILE = os.environ.get(
"SENSOR_CONFIG_FILE",
str(CONFIG_DIR / "sensors.json")
)
# Application settings
WATCHDOG_TIMEOUT = int(os.environ.get("WATCHDOG_TIMEOUT", "5"))
ENABLE_BATTERY = os.environ.get("ENABLE_BATTERY", "true").lower() == "true"
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
# Bluetooth settings
SKIP_IDENTICAL = int(os.environ.get("SKIP_IDENTICAL", "50"))
DEBOUNCE = os.environ.get("DEBOUNCE", "true").lower() == "true"
# ntfy notification settings (optional)
NTFY_ENABLED = os.environ.get("NTFY_ENABLED", "false").lower() == "true"
NTFY_URL = os.environ.get("NTFY_URL", "https://ntfy.sh")
NTFY_TOPIC = os.environ.get("NTFY_TOPIC", "sensorpajen")
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
# Discovery settings
DISCOVERED_SENSORS_FILE = os.environ.get(
"DISCOVERED_SENSORS_FILE",
str(STATE_DIR / "discovered_sensors.json")
)
CONFIG_RELOAD_INTERVAL = int(os.environ.get("CONFIG_RELOAD_INTERVAL", "900")) # 15 minutes
class SensorConfig:
"""Manages sensor configuration from JSON file."""
def __init__(self, config_file: str = SENSOR_CONFIG_FILE):
"""
Initialize sensor configuration.
Args:
config_file: Path to sensors JSON configuration file
"""
self.config_file = Path(config_file)
self.sensors: Dict[str, str] = {}
self.load()
def load(self):
"""Load sensor configuration from JSON file."""
if not self.config_file.exists():
logger.warning(
f"Sensor configuration file not found: {self.config_file}\n"
f"Starting with no sensors - use discovery to add sensors"
)
return
try:
with open(self.config_file, 'r') as f:
data = json.load(f)
# Convert sensors list to MAC -> name mapping
for sensor in data.get('sensors', []):
mac = sensor.get('mac', '').upper()
name = sensor.get('name')
if mac and name:
self.sensors[mac] = name
logger.debug(f"Loaded sensor: {mac} -> {name}")
logger.info(f"Loaded {len(self.sensors)} sensors from {self.config_file}")
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in {self.config_file}: {e}")
except Exception as e:
raise RuntimeError(f"Error loading sensor config: {e}")
def get_name(self, mac: str) -> str:
"""
Get sensor name by MAC address.
Args:
mac: MAC address (any case)
Returns:
Sensor name or the MAC address if not found
"""
return self.sensors.get(mac.upper(), mac)
def get_all_macs(self) -> List[str]:
"""Get list of all configured MAC addresses."""
return list(self.sensors.keys())
def validate_config():
"""
Validate configuration and log settings.
Should be called at application startup.
"""
install_type = "System" if Path('/opt/sensorpajen').exists() else "Development"
logger.info("=== Sensorpajen Configuration ===")
logger.info(f"Installation Type: {install_type}")
logger.info(f"Project Root: {PROJECT_ROOT}")
logger.info(f"Config Directory: {CONFIG_DIR}")
logger.info(f"State Directory: {STATE_DIR}")
logger.info(f"MQTT Host: {MQTT_HOST}:{MQTT_PORT}")
logger.info(f"MQTT User: {MQTT_USER}")
logger.info(f"MQTT Client ID: {MQTT_CLIENT_ID}")
logger.info(f"MQTT Topic Prefix: {MQTT_TOPIC_PREFIX}")
logger.info(f"Sensor Config: {SENSOR_CONFIG_FILE}")
logger.info(f"Discovered Sensors: {DISCOVERED_SENSORS_FILE}")
logger.info(f"Watchdog Timeout: {WATCHDOG_TIMEOUT}s")
logger.info(f"Battery Monitoring: {ENABLE_BATTERY}")
logger.info(f"Config Reload Interval: {CONFIG_RELOAD_INTERVAL}s")
logger.info(f"ntfy Enabled: {NTFY_ENABLED}")
if NTFY_ENABLED:
logger.info(f"ntfy URL: {NTFY_URL}/{NTFY_TOPIC}")
logger.info(f"Log Level: {LOG_LEVEL}")
logger.info("================================")

View File

@@ -1,263 +0,0 @@
"""
Discovery manager for tracking and managing discovered sensors.
Maintains a database of discovered sensors with their metadata and status.
"""
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from . import config
logger = logging.getLogger(__name__)
@dataclass
class DiscoveredSensor:
"""Represents a discovered sensor with metadata."""
mac: str
name: str
rssi: int
first_seen: str
last_seen: str
sample_reading: Dict[str, float]
status: str = "pending" # pending, approved, ignored
reviewed: bool = False # Has been shown in approval CLI
ignored_at: Optional[str] = None
ignore_reason: Optional[str] = None
class DiscoveryManager:
"""Manages discovered sensors and their approval status."""
def __init__(self, discovery_file: str = config.DISCOVERED_SENSORS_FILE):
"""
Initialize discovery manager.
Args:
discovery_file: Path to discovered sensors JSON file
"""
self.discovery_file = Path(discovery_file)
self.sensors: Dict[str, DiscoveredSensor] = {}
self.load()
def load(self):
"""Load discovered sensors from JSON file."""
if not self.discovery_file.exists():
logger.info(f"Creating new discovered sensors file: {self.discovery_file}")
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
self.save()
return
try:
with open(self.discovery_file, 'r') as f:
data = json.load(f)
for sensor_data in data:
sensor = DiscoveredSensor(**sensor_data)
self.sensors[sensor.mac.upper()] = sensor
logger.info(f"Loaded {len(self.sensors)} discovered sensors")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in {self.discovery_file}: {e}")
except Exception as e:
logger.error(f"Error loading discovered sensors: {e}")
def save(self):
"""Save discovered sensors to JSON file."""
try:
# Ensure directory exists
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
# Convert sensors to list of dicts
data = [asdict(sensor) for sensor in self.sensors.values()]
with open(self.discovery_file, 'w') as f:
json.dump(data, f, indent=2)
logger.debug(f"Saved {len(self.sensors)} discovered sensors")
except Exception as e:
logger.error(f"Error saving discovered sensors: {e}")
def add_or_update(self, mac: str, name: str, rssi: int,
temperature: float, humidity: float,
battery_percent: int, battery_voltage: int) -> bool:
"""
Add or update a discovered sensor.
Args:
mac: MAC address
name: Advertised device name
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
Returns:
True if this is a newly discovered sensor, False if updated existing
"""
mac = mac.upper()
now = datetime.now().isoformat()
sample_reading = {
"temperature": temperature,
"humidity": humidity,
"battery_percent": battery_percent,
"battery_voltage": battery_voltage
}
if mac in self.sensors:
# Update existing sensor
sensor = self.sensors[mac]
sensor.last_seen = now
sensor.rssi = rssi
sensor.sample_reading = sample_reading
self.save()
return False
else:
# New sensor discovered
sensor = DiscoveredSensor(
mac=mac,
name=name,
rssi=rssi,
first_seen=now,
last_seen=now,
sample_reading=sample_reading,
status="pending"
)
self.sensors[mac] = sensor
self.save()
logger.info(f"New sensor discovered: {mac} ({name})")
return True
def is_known(self, mac: str) -> bool:
"""
Check if a sensor has been discovered before.
Args:
mac: MAC address
Returns:
True if sensor is in discovered list
"""
return mac.upper() in self.sensors
def get_status(self, mac: str) -> Optional[str]:
"""
Get status of a discovered sensor.
Args:
mac: MAC address
Returns:
Status string or None if not found
"""
sensor = self.sensors.get(mac.upper())
return sensor.status if sensor else None
def approve(self, mac: str):
"""
Mark a sensor as approved.
Args:
mac: MAC address
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "approved"
self.save()
logger.info(f"Sensor approved: {mac}")
def ignore(self, mac: str, reason: Optional[str] = None):
"""
Mark a sensor as ignored.
Args:
mac: MAC address
reason: Optional reason for ignoring
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "ignored"
self.sensors[mac].ignored_at = datetime.now().isoformat()
self.sensors[mac].ignore_reason = reason
self.save()
logger.info(f"Sensor ignored: {mac}")
def get_pending(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'pending'."""
return [s for s in self.sensors.values() if s.status == "pending"]
def get_new_pending(self) -> List[DiscoveredSensor]:
"""Get list of pending sensors that haven't been reviewed yet."""
return [s for s in self.sensors.values() if s.status == "pending" and not s.reviewed]
def get_ignored(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'ignored'."""
return [s for s in self.sensors.values() if s.status == "ignored"]
def mark_reviewed(self, mac: str):
"""
Mark a sensor as reviewed (shown in approval CLI).
Args:
mac: MAC address
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].reviewed = True
self.save()
def send_ntfy_notification(self, sensor: DiscoveredSensor):
"""
Send ntfy notification for a newly discovered sensor.
Args:
sensor: Discovered sensor to notify about
"""
if not config.NTFY_ENABLED:
logger.debug("ntfy notifications disabled")
return
if not config.NTFY_TOKEN:
logger.warning("ntfy enabled but NTFY_TOKEN not set")
return
try:
message = (
f"🆕 New sensor discovered!\n\n"
f"MAC: {sensor.mac}\n"
f"Name: {sensor.name}\n"
f"Last seen: {sensor.last_seen}\n"
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C\n"
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%\n"
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%\n\n"
f"Run 'sensorpajen approve-sensors' to approve or ignore."
)
url = f"{config.NTFY_URL}/{config.NTFY_TOPIC}"
result = subprocess.run(
["curl", "-H", f"Authorization: Bearer {config.NTFY_TOKEN}",
"-d", message, url],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Sent ntfy notification for {sensor.mac}")
else:
logger.warning(f"ntfy notification failed: {result.stderr.decode()}")
except subprocess.TimeoutExpired:
logger.warning("ntfy notification timed out")
except Exception as e:
logger.error(f"Error sending ntfy notification: {e}")

View File

@@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Sensorpajen - Main entry point
Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC sensors.
Publishes sensor data to MQTT broker.
"""
import sys
import signal
import logging
import time
import threading
from pathlib import Path
from . import __version__
from . import config
from .mqtt_publisher import MQTTPublisher
from .sensor_reader import SensorReader, Measurement
from .discovery_manager import DiscoveryManager
class Sensorpajen:
"""Main application class."""
def __init__(self):
"""Initialize the application."""
self.mqtt_publisher: MQTTPublisher = None
self.sensor_reader: SensorReader = None
self.sensor_config: config.SensorConfig = None
self.discovery_manager: DiscoveryManager = None
self.running = False
self.config_reload_timer: threading.Timer = None
# Setup logging
self._setup_logging()
# Setup signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _setup_logging(self):
"""Configure logging to stdout for journald."""
log_level = getattr(logging, config.LOG_LEVEL, logging.INFO)
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
# Set our logger
self.logger = logging.getLogger(__name__)
def _signal_handler(self, sig, frame):
"""Handle shutdown signals."""
signal_name = "SIGTERM" if sig == signal.SIGTERM else "SIGINT"
self.logger.info(f"Received {signal_name}, shutting down gracefully...")
self.shutdown()
sys.exit(0)
def _on_measurement(self, measurement: Measurement):
"""
Callback for new sensor measurements.
Args:
measurement: Sensor measurement data
"""
try:
# Publish to MQTT
self.mqtt_publisher.publish_measurement(
sensor_name=measurement.sensor_name,
temperature=measurement.temperature,
humidity=measurement.humidity,
battery_voltage=measurement.voltage,
battery_level=measurement.battery
)
except Exception as e:
self.logger.error(f"Error handling measurement: {e}")
def _reload_config(self):
"""Reload sensor configuration periodically."""
if not self.running:
return
try:
self.logger.info("Reloading sensor configuration...")
old_sensors = set(self.sensor_config.sensors.keys())
self.sensor_config.load()
new_sensors = set(self.sensor_config.sensors.keys())
added = new_sensors - old_sensors
removed = old_sensors - new_sensors
if added:
self.logger.info(f"Added sensors: {', '.join(added)}")
if removed:
self.logger.info(f"Removed sensors: {', '.join(removed)}")
if not added and not removed:
self.logger.debug("No sensor configuration changes")
except Exception as e:
self.logger.error(f"Error reloading configuration: {e}")
finally:
# Schedule next reload
if self.running:
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
def start(self):
"""Start the application."""
try:
self.logger.info("=" * 50)
self.logger.info(f"Starting Sensorpajen v{__version__}")
self.logger.info("=" * 50)
# Validate and log configuration
config.validate_config()
# Load sensor configuration
self.sensor_config = config.SensorConfig()
if len(self.sensor_config.sensors) == 0:
self.logger.warning("No sensors configured")
self.logger.warning("Starting in discovery-only mode")
self.logger.warning("Use 'sensorpajen approve-sensors' to add sensors")
# Initialize discovery manager
self.logger.info("Initializing discovery manager...")
self.discovery_manager = DiscoveryManager()
# Initialize MQTT publisher
self.logger.info("Initializing MQTT publisher...")
self.mqtt_publisher = MQTTPublisher()
self.mqtt_publisher.connect()
# Wait a moment for MQTT connection
time.sleep(1)
if not self.mqtt_publisher.is_connected():
self.logger.warning("MQTT connection not established yet, continuing anyway...")
# Initialize sensor reader
self.logger.info("Initializing Bluetooth sensor reader...")
self.sensor_reader = SensorReader(
sensor_config=self.sensor_config,
discovery_manager=self.discovery_manager,
on_measurement=self._on_measurement,
interface=0 # hci0
)
# Start config reload timer
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
self.logger.info(f"Config reload scheduled every {config.CONFIG_RELOAD_INTERVAL}s")
# Start reading sensors (blocking call)
self.logger.info("=" * 50)
self.logger.info("Sensorpajen is now running")
self.logger.info("Monitoring sensors via Bluetooth...")
self.logger.info("Publishing to MQTT...")
self.logger.info("Press Ctrl+C to stop")
self.logger.info("=" * 50)
self.running = True
self.sensor_reader.start()
except FileNotFoundError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except RuntimeError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except Exception as e:
self.logger.error(f"Failed to start application: {e}", exc_info=True)
self.shutdown()
sys.exit(1)
def shutdown(self):
"""Shutdown the application gracefully."""
if not self.running:
return
self.running = False
self.logger.info("Shutting down...")
# Cancel config reload timer
if self.config_reload_timer:
try:
self.config_reload_timer.cancel()
except Exception as e:
self.logger.error(f"Error canceling reload timer: {e}")
# Stop sensor reader
if self.sensor_reader:
try:
self.sensor_reader.stop()
except Exception as e:
self.logger.error(f"Error stopping sensor reader: {e}")
# Disconnect MQTT
if self.mqtt_publisher:
try:
self.mqtt_publisher.disconnect()
except Exception as e:
self.logger.error(f"Error disconnecting MQTT: {e}")
self.logger.info("Shutdown complete")
def main():
"""Main entry point."""
app = Sensorpajen()
app.start()
if __name__ == "__main__":
main()

View File

@@ -1,131 +0,0 @@
"""
MQTT Publisher for sensor data.
Handles connection to MQTT broker and publishing of sensor measurements.
"""
import logging
import paho.mqtt.client as mqtt
from typing import Optional
from . import config
logger = logging.getLogger(__name__)
class MQTTPublisher:
"""Manages MQTT connection and publishing of sensor data."""
def __init__(self):
"""Initialize MQTT publisher with configuration."""
self.client: Optional[mqtt.Client] = None
self.connected = False
self._setup_client()
def _setup_client(self):
"""Setup MQTT client with callbacks."""
# Handle both paho-mqtt v1.x and v2.x
try:
# Try v2.x format (with callback_api_version)
self.client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION1,
client_id=config.MQTT_CLIENT_ID
)
except (TypeError, AttributeError):
# Fall back to v1.x format
self.client = mqtt.Client(config.MQTT_CLIENT_ID)
# Set credentials if provided
if config.MQTT_USER and config.MQTT_PASSWORD:
self.client.username_pw_set(config.MQTT_USER, config.MQTT_PASSWORD)
# Setup callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
logger.info(f"MQTT client configured for {config.MQTT_HOST}:{config.MQTT_PORT}")
def _on_connect(self, client, userdata, flags, rc):
"""Callback for when client connects to broker."""
if rc == 0:
self.connected = True
logger.info(f"Connected to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
else:
self.connected = False
logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Callback for when client disconnects from broker."""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
else:
logger.info("Disconnected from MQTT broker")
def _on_publish(self, client, userdata, mid):
"""Callback for when message is published."""
logger.debug(f"Message published: {mid}")
def connect(self):
"""Connect to MQTT broker."""
try:
logger.info(f"Connecting to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
self.client.connect(config.MQTT_HOST, config.MQTT_PORT, keepalive=60)
self.client.loop_start() # Start network loop in background thread
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
def disconnect(self):
"""Disconnect from MQTT broker."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
def publish_measurement(self, sensor_name: str, temperature: float,
humidity: int, battery_voltage: float = None,
battery_level: int = None):
"""
Publish sensor measurement to MQTT.
Args:
sensor_name: Name of the sensor
temperature: Temperature in Celsius
humidity: Humidity percentage
battery_voltage: Battery voltage (optional)
battery_level: Battery level percentage (optional)
"""
if not self.connected:
logger.warning("Not connected to MQTT broker, skipping publish")
return
topic_prefix = f"{config.MQTT_TOPIC_PREFIX}/{sensor_name}"
try:
# Publish temperature
self.client.publish(f"{topic_prefix}/temp", f"{temperature:.1f}")
logger.debug(f"{sensor_name}: temp={temperature:.1f}°C")
# Publish humidity
self.client.publish(f"{topic_prefix}/humidity", f"{humidity}")
logger.debug(f"{sensor_name}: humidity={humidity}%")
# Publish battery info if enabled and available
if config.ENABLE_BATTERY:
if battery_voltage is not None:
self.client.publish(f"{topic_prefix}/batteryvoltage", f"{battery_voltage:.3f}")
logger.debug(f"{sensor_name}: battery_voltage={battery_voltage:.3f}V")
if battery_level is not None:
self.client.publish(f"{topic_prefix}/batterylevel", f"{battery_level}")
logger.debug(f"{sensor_name}: battery_level={battery_level}%")
logger.info(f"Published: {sensor_name} - {temperature:.1f}°C, {humidity}%")
except Exception as e:
logger.error(f"Error publishing to MQTT: {e}")
def is_connected(self) -> bool:
"""Check if connected to MQTT broker."""
return self.connected

View File

@@ -1,292 +0,0 @@
"""
Bluetooth sensor reader for Xiaomi Mijia LYWSD03MMC sensors with ATC firmware.
Reads temperature, humidity, and battery data from BLE advertisements.
"""
import logging
import time
import threading
import bluetooth._bluetooth as bluez
from dataclasses import dataclass
from typing import Optional, Callable, Dict
from . import config
from .utils import (enable_le_scan, disable_le_scan,
parse_le_advertising_events, raw_packet_to_str, toggle_device)
logger = logging.getLogger(__name__)
@dataclass
class Measurement:
"""Sensor measurement data."""
temperature: float
humidity: int
voltage: float
battery: int = 0
rssi: int = 0
sensor_name: str = ""
timestamp: int = 0
class SensorReader:
"""Reads Xiaomi LYWSD03MMC sensors with ATC firmware via BLE."""
def __init__(self, sensor_config: config.SensorConfig,
discovery_manager,
on_measurement: Callable[[Measurement], None],
interface: int = 0):
"""
Initialize sensor reader.
Args:
sensor_config: Sensor configuration mapping
discovery_manager: Discovery manager for tracking new sensors
on_measurement: Callback function for new measurements
interface: Bluetooth interface number (default 0 for hci0)
"""
self.sensor_config = sensor_config
self.discovery_manager = discovery_manager
self.on_measurement = on_measurement
self.interface = interface
self.sock: Optional[int] = None
self.running = False
self.last_ble_packet = time.time()
self.adv_counter: Dict[str, str] = {} # Track advertisement numbers to avoid duplicates
self.watchdog_thread: Optional[threading.Thread] = None
def start(self):
"""Start BLE scanning for sensors."""
try:
logger.info(f"Starting BLE scan on hci{self.interface}")
# Enable bluetooth device
toggle_device(self.interface, True)
# Open bluetooth socket
try:
self.sock = bluez.hci_open_dev(self.interface)
except Exception as e:
logger.error(f"Cannot open bluetooth device hci{self.interface}: {e}")
raise
# Enable LE scanning without filtering duplicates
enable_le_scan(self.sock, filter_duplicates=False)
# Start watchdog if configured
if config.WATCHDOG_TIMEOUT > 0:
self.running = True
self.watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self.watchdog_thread.start()
logger.info(f"Watchdog started with {config.WATCHDOG_TIMEOUT}s timeout")
logger.info("BLE scanning enabled")
logger.info(f"Monitoring {len(self.sensor_config.sensors)} sensors")
# Start parsing advertisements (blocking call)
parse_le_advertising_events(
self.sock,
handler=self._handle_ble_packet,
debug=False
)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
self.stop()
except Exception as e:
logger.error(f"Error in sensor reader: {e}")
self.stop()
raise
def stop(self):
"""Stop BLE scanning."""
self.running = False
if self.sock:
try:
disable_le_scan(self.sock)
logger.info("BLE scanning disabled")
except Exception as e:
logger.error(f"Error disabling BLE scan: {e}")
if self.watchdog_thread and self.watchdog_thread.is_alive():
self.watchdog_thread.join(timeout=2)
def _watchdog_loop(self):
"""Watchdog thread to restart BLE scanning if no packets received."""
restart_counter = 1
while self.running:
time.sleep(1)
now = time.time()
elapsed = now - self.last_ble_packet
if elapsed > config.WATCHDOG_TIMEOUT:
logger.warning(
f"Watchdog: No BLE packet within {int(elapsed)}s. "
f"Restarting BLE scan (count: {restart_counter})"
)
try:
disable_le_scan(self.sock)
time.sleep(1)
enable_le_scan(self.sock, filter_duplicates=False)
restart_counter += 1
self.last_ble_packet = now # Reset timer
except Exception as e:
logger.error(f"Error restarting BLE scan: {e}")
def _handle_ble_packet(self, mac: str, adv_type: int, data: bytes, rssi: int):
"""
Handle incoming BLE advertisement packet.
Args:
mac: MAC address of the device
adv_type: Advertisement type
data: Advertisement data
rssi: Signal strength
"""
# Update last packet time for watchdog
self.last_ble_packet = time.time()
# Convert data to hex string
data_str = raw_packet_to_str(data)
# Check if this is an ATC packet
# ATC format: [... service UUID 0x181A ... MAC ... data ...]
atc_identifier = data_str[6:10].upper()
if atc_identifier != "1A18":
return # Not an ATC packet
# Extract MAC from packet and verify it matches
packet_mac = data_str[10:22].upper()
mac_str = mac.replace(":", "").upper()
if packet_mac != mac_str:
return # MAC mismatch
mac_with_colons = mac.upper()
# Parse ATC data packet first to get sensor data
try:
parsed_data = self._parse_atc_data(data_str)
if not parsed_data:
return
temperature, humidity, battery_percent, battery_voltage, adv_number = parsed_data
# Check if this is a known sensor
if mac_with_colons not in self.sensor_config.sensors:
# Unknown sensor - check if we should discover it
self._handle_unknown_sensor(
mac_with_colons,
rssi,
temperature,
humidity,
battery_percent,
battery_voltage
)
return
# Check advertisement number to avoid duplicates
if mac_str in self.adv_counter:
if self.adv_counter[mac_str] == adv_number:
return # Duplicate packet
self.adv_counter[mac_str] = adv_number
# Create measurement for known sensor
sensor_name = self.sensor_config.get_name(mac_with_colons)
measurement = Measurement(
temperature=temperature,
humidity=humidity,
voltage=battery_voltage / 1000.0,
battery=battery_percent,
rssi=rssi,
sensor_name=sensor_name,
timestamp=int(time.time())
)
# Log the measurement
logger.info(
f"{measurement.sensor_name}: {measurement.temperature}°C, "
f"{measurement.humidity}%, {measurement.voltage}V, "
f"battery {measurement.battery}%, RSSI {rssi}dBm"
)
# Call measurement callback
if self.on_measurement:
self.on_measurement(measurement)
except Exception as e:
logger.error(f"Error parsing ATC packet from {mac}: {e}")
def _handle_unknown_sensor(self, mac: str, rssi: int, temperature: float,
humidity: int, battery_percent: int, battery_voltage: int):
"""
Handle discovery of unknown sensor.
Args:
mac: MAC address with colons
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
"""
# Get or construct device name from MAC
# ATC sensors advertise as ATC_XXXXXX where XXXXXX is last 3 bytes
mac_suffix = mac.replace(":", "")[-6:]
device_name = f"ATC_{mac_suffix}"
# Check if already discovered
if self.discovery_manager.is_known(mac):
# Just update the discovery record
self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
return
# New sensor - discover and notify
is_new = self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
if is_new:
logger.info(f"New sensor discovered: {mac} ({device_name})")
sensor = self.discovery_manager.sensors[mac]
self.discovery_manager.send_ntfy_notification(sensor)
def _parse_atc_data(self, data_str: str) -> Optional[tuple]:
"""
Parse ATC advertisement data.
Returns:
Tuple of (temperature, humidity, battery_percent, battery_voltage, adv_number) or None
"""
try:
# Temperature: bytes 22-26, signed int16, big endian, /10
temp_hex = data_str[22:26]
temp_raw = int(temp_hex, 16)
if temp_raw & 0x8000: # Check sign bit
temp_raw = temp_raw - 0x10000
temperature = temp_raw / 10.0
# Humidity: bytes 26-28, uint8
humidity = int(data_str[26:28], 16)
# Battery: bytes 28-30, uint8
battery_percent = int(data_str[28:30], 16)
# Battery voltage: bytes 30-34, uint16, big endian, mV
battery_voltage = int(data_str[30:34], 16)
# Advertisement number: last 2 bytes
adv_number = data_str[-2:]
return (temperature, humidity, battery_percent, battery_voltage, adv_number)
except (ValueError, IndexError) as e:
logger.debug(f"Error parsing ATC data: {e}")
return None

View File

@@ -1,421 +0,0 @@
# -*- coding: utf-8 -*-
# This file is from https://github.com/colin-guyon/py-bluetooth-utils
# published under MIT License
# MIT License
# Copyright (c) 2020 Colin GUYON
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Module containing some bluetooth utility functions (linux only).
It either uses HCI commands using PyBluez, or does ioctl calls like it's
done in Bluez tools such as hciconfig.
Main functions:
- toggle_device : enable or disable a bluetooth device
- set_scan : set scan type on a device ("noscan", "iscan", "pscan", "piscan")
- enable/disable_le_scan : enable BLE scanning
- parse_le_advertising_events : parse and read BLE advertisements packets
- start/stop_le_advertising : advertise custom data using BLE
Bluez : http://www.bluez.org/
PyBluez : http://karulis.github.io/pybluez/
The module was in particular inspired from 'iBeacon-Scanner-'
https://github.com/switchdoclabs/iBeacon-Scanner-/blob/master/blescan.py
and sometimes directly from the Bluez sources.
"""
from __future__ import absolute_import
import sys
import struct
import fcntl
import array
import socket
from errno import EALREADY
# import PyBluez
import bluetooth._bluetooth as bluez
__all__ = ('toggle_device', 'set_scan',
'enable_le_scan', 'disable_le_scan', 'parse_le_advertising_events',
'start_le_advertising', 'stop_le_advertising',
'raw_packet_to_str')
LE_META_EVENT = 0x3E
LE_PUBLIC_ADDRESS = 0x00
LE_RANDOM_ADDRESS = 0x01
OGF_LE_CTL = 0x08
OCF_LE_SET_SCAN_PARAMETERS = 0x000B
OCF_LE_SET_SCAN_ENABLE = 0x000C
OCF_LE_CREATE_CONN = 0x000D
OCF_LE_SET_ADVERTISING_PARAMETERS = 0x0006
OCF_LE_SET_ADVERTISE_ENABLE = 0x000A
OCF_LE_SET_ADVERTISING_DATA = 0x0008
SCAN_TYPE_PASSIVE = 0x00
SCAN_FILTER_DUPLICATES = 0x01
SCAN_DISABLE = 0x00
SCAN_ENABLE = 0x01
# sub-events of LE_META_EVENT
EVT_LE_CONN_COMPLETE = 0x01
EVT_LE_ADVERTISING_REPORT = 0x02
EVT_LE_CONN_UPDATE_COMPLETE = 0x03
EVT_LE_READ_REMOTE_USED_FEATURES_COMPLETE = 0x04
# Advertisement event types
ADV_IND = 0x00
ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03
ADV_SCAN_RSP = 0x04
# Allow Scan Request from Any, Connect Request from Any
FILTER_POLICY_NO_WHITELIST = 0x00
# Allow Scan Request from White List Only, Connect Request from Any
FILTER_POLICY_SCAN_WHITELIST = 0x01
# Allow Scan Request from Any, Connect Request from White List Only
FILTER_POLICY_CONN_WHITELIST = 0x02
# Allow Scan Request from White List Only, Connect Request from White List Only
FILTER_POLICY_SCAN_AND_CONN_WHITELIST = 0x03
def toggle_device(dev_id, enable):
"""
Power ON or OFF a bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param enable: Whether to enable of disable the device.
:type enable: ``bool``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
print("Power %s bluetooth device %d" % ('ON' if enable else 'OFF', dev_id))
# di = struct.pack("HbBIBBIIIHHHH10I", dev_id, *((0,) * 22))
# fcntl.ioctl(hci_sock.fileno(), bluez.HCIGETDEVINFO, di)
req_str = struct.pack("H", dev_id)
request = array.array("b", req_str)
try:
fcntl.ioctl(hci_sock.fileno(),
bluez.HCIDEVUP if enable else bluez.HCIDEVDOWN,
request[0])
except IOError as e:
if e.errno == EALREADY:
print("Bluetooth device %d is already %s" % (
dev_id, 'enabled' if enable else 'disabled'))
else:
raise
finally:
hci_sock.close()
# Types of bluetooth scan
SCAN_DISABLED = 0x00
SCAN_INQUIRY = 0x01
SCAN_PAGE = 0x02
def set_scan(dev_id, scan_type):
"""
Set scan type on a given bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param scan_type: One of
``'noscan'``
``'iscan'``
``'pscan'``
``'piscan'``
:type scan_type: ``str``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
if scan_type == "noscan":
dev_opt = SCAN_DISABLED
elif scan_type == "iscan":
dev_opt = SCAN_INQUIRY
elif scan_type == "pscan":
dev_opt = SCAN_PAGE
elif scan_type == "piscan":
dev_opt = SCAN_PAGE | SCAN_INQUIRY
else:
raise ValueError("Unknown scan type %r" % scan_type)
req_str = struct.pack("HI", dev_id, dev_opt)
print("Set scan type %r to bluetooth device %d" % (scan_type, dev_id))
try:
fcntl.ioctl(hci_sock.fileno(), bluez.HCISETSCAN, req_str)
finally:
hci_sock.close()
def raw_packet_to_str(pkt):
"""
Returns the string representation of a raw HCI packet.
"""
if sys.version_info > (3, 0):
return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in pkt)
else:
return ''.join('%02x' % struct.unpack("B", x)[0] for x in pkt)
def enable_le_scan(sock, interval=0x0800, window=0x0800,
filter_policy=FILTER_POLICY_NO_WHITELIST,
filter_duplicates=True):
"""
Enable LE passive scan (with filtering of duplicate packets enabled).
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param interval: Scan interval.
:param window: Scan window (must be less or equal than given interval).
:param filter_policy: One of
``FILTER_POLICY_NO_WHITELIST`` (default value)
``FILTER_POLICY_SCAN_WHITELIST``
``FILTER_POLICY_CONN_WHITELIST``
``FILTER_POLICY_SCAN_AND_CONN_WHITELIST``
.. note:: Scan interval and window are to multiply by 0.625 ms to
get the real time duration.
"""
print("Enable LE scan")
own_bdaddr_type = LE_PUBLIC_ADDRESS # does not work with LE_RANDOM_ADDRESS
cmd_pkt = struct.pack("<BHHBB", SCAN_TYPE_PASSIVE, interval, window,
own_bdaddr_type, filter_policy)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_PARAMETERS, cmd_pkt)
print("scan params: interval=%.3fms window=%.3fms own_bdaddr=%s "
"whitelist=%s" %
(interval * 0.625, window * 0.625,
'public' if own_bdaddr_type == LE_PUBLIC_ADDRESS else 'random',
'yes' if filter_policy in (FILTER_POLICY_SCAN_WHITELIST,
FILTER_POLICY_SCAN_AND_CONN_WHITELIST)
else 'no'))
cmd_pkt = struct.pack("<BB", SCAN_ENABLE, SCAN_FILTER_DUPLICATES if filter_duplicates else 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def disable_le_scan(sock):
"""
Disable LE scan.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
print("Disable LE scan")
cmd_pkt = struct.pack("<BB", SCAN_DISABLE, 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def start_le_advertising(sock, min_interval=1000, max_interval=1000,
adv_type=ADV_NONCONN_IND, data=()):
"""
Start LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param min_interval: Minimum advertising interval.
:param max_interval: Maximum advertising interval.
:param adv_type: Advertisement type (``ADV_NONCONN_IND`` by default).
:param data: The advertisement data (maximum of 31 bytes).
:type data: iterable
"""
own_bdaddr_type = 0
direct_bdaddr_type = 0
direct_bdaddr = (0,) * 6
chan_map = 0x07 # All channels: 37, 38, 39
filter = 0
struct_params = [min_interval, max_interval, adv_type, own_bdaddr_type,
direct_bdaddr_type]
struct_params.extend(direct_bdaddr)
struct_params.extend((chan_map, filter))
cmd_pkt = struct.pack("<HHBBB6BBB", *struct_params)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_PARAMETERS,
cmd_pkt)
cmd_pkt = struct.pack("<B", 0x01)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
data_length = len(data)
if data_length > 31:
raise ValueError("data is too long (%d but max is 31 bytes)",
data_length)
cmd_pkt = struct.pack("<B%dB" % data_length, data_length, *data)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_DATA, cmd_pkt)
print("Advertising started data_length=%d data=%r" % (data_length, data))
def stop_le_advertising(sock):
"""
Stop LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
cmd_pkt = struct.pack("<B", 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
print("Advertising stopped")
def parse_le_advertising_events(sock, mac_addr=None, packet_length=None,
handler=None, debug=False):
"""
Parse and report LE advertisements.
This is a blocking call, an infinite loop is started and the
given handler will be called each time a new LE advertisement packet
is detected and corresponds to the given filters.
.. note:: The :func:`.start_le_advertising` function must be
called before calling this function.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param mac_addr: list of filtered mac address representations
(uppercase, with ':' separators).
If not specified, the LE advertisement of any device will be reported.
Example: mac_addr=('00:2A:5F:FF:25:11', 'DA:FF:12:33:66:12')
:type mac_addr: ``list`` of ``string``
:param packet_length: Filter a specific length of LE advertisement packet.
:type packet_length: ``int``
:param handler: Handler that will be called each time a LE advertisement
packet is available (in accordance with the ``mac_addr``
and ``packet_length`` filters).
:type handler: ``callable`` taking 4 parameters:
mac (``str``), adv_type (``int``), data (``bytes``) and rssi (``int``)
:param debug: Enable debug prints.
:type debug: ``bool``
"""
if not debug and handler is None:
raise ValueError("You must either enable debug or give a handler !")
old_filter = sock.getsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, 14)
flt = bluez.hci_filter_new()
bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
# bluez.hci_filter_all_events(flt)
bluez.hci_filter_set_event(flt, LE_META_EVENT)
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, flt)
print("socket filter set to ptype=HCI_EVENT_PKT event=LE_META_EVENT")
print("Listening ...")
try:
while True:
pkt = full_pkt = sock.recv(255)
ptype, event, plen = struct.unpack("BBB", pkt[:3])
if event != LE_META_EVENT:
# Should never occur because we filtered with this type of event
print("Not a LE_META_EVENT !")
continue
sub_event, = struct.unpack("B", pkt[3:4])
if sub_event != EVT_LE_ADVERTISING_REPORT:
if debug:
print("Not a EVT_LE_ADVERTISING_REPORT !")
continue
pkt = pkt[4:]
adv_type = struct.unpack("b", pkt[1:2])[0]
mac_addr_str = bluez.ba2str(pkt[3:9])
if packet_length and plen != packet_length:
# ignore this packet
if debug:
print("packet with non-matching length: mac=%s adv_type=%02x plen=%s" %
(mac_addr_str, adv_type, plen))
print(raw_packet_to_str(pkt))
continue
data = pkt[9:-1]
rssi = struct.unpack("b", full_pkt[len(full_pkt)-1:len(full_pkt)])[0]
if mac_addr and mac_addr_str not in mac_addr:
if debug:
print("packet with non-matching mac %s adv_type=%02x data=%s RSSI=%s" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
continue
if debug:
print("LE advertisement: mac=%s adv_type=%02x data=%s RSSI=%d" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
if handler is not None:
try:
handler(mac_addr_str, adv_type, data, rssi)
except Exception as e:
print('Exception when calling handler with a BLE advertising event: %r' % (e,))
import traceback
traceback.print_exc()
except KeyboardInterrupt:
print("\nRestore previous socket filter")
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, old_filter)
raise
"""
def hci_le_add_white_list(int dd, const bdaddr_t *bdaddr, uint8_t type, int to)
{
struct hci_request {
uint16_t ogf;
uint16_t ocf;
int event;
void *cparam;
int clen;
void *rparam;
int rlen;
};
struct hci_request rq;
le_add_device_to_white_list_cp cp;
uint8_t status;
memset(&cp, 0, sizeof(cp));
cp.bdaddr_type = type;
bacpy(&cp.bdaddr, bdaddr);
memset(&rq, 0, sizeof(rq));
rq.ogf = OGF_LE_CTL;
rq.ocf = OCF_LE_ADD_DEVICE_TO_WHITE_LIST;
rq.cparam = &cp;
rq.clen = LE_ADD_DEVICE_TO_WHITE_LIST_CP_SIZE;
rq.rparam = &status;
rq.rlen = 1;
if (hci_send_req(dd, &rq, to) < 0)
return -1;
if (status) {
errno = EIO;
return -1;
}
return 0;
}"""

View File

@@ -1,32 +0,0 @@
[Unit]
Description=Sensorpajen - Bluetooth Temperature Sensor Monitor
Documentation=https://github.com/yourusername/sensorpajen
After=network.target bluetooth.target
Wants=bluetooth.target
[Service]
Type=simple
User=sensorpajen
Group=sensorpajen
WorkingDirectory=/opt/sensorpajen
EnvironmentFile=/etc/sensorpajen/sensorpajen.env
ExecStart=/opt/sensorpajen/venv/bin/python -m sensorpajen.main
Restart=always
RestartSec=10
# Bluetooth capabilities require this to be false
NoNewPrivileges=false
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=sensorpajen
# Security hardening (where possible with Bluetooth requirements)
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/etc/sensorpajen /var/lib/sensorpajen
[Install]
WantedBy=multi-user.target

View File

@@ -1,32 +0,0 @@
[
{
"mac": "A4:C1:38:12:34:56",
"name": "ATC_123456",
"rssi": -65,
"first_seen": "2025-12-27T10:30:15",
"last_seen": "2025-12-27T10:35:42",
"sample_reading": {
"temperature": 21.5,
"humidity": 45,
"battery_percent": 87,
"battery_voltage": 2950
},
"status": "pending"
},
{
"mac": "A4:C1:38:AB:CD:EF",
"name": "ATC_ABCDEF",
"rssi": -72,
"first_seen": "2025-12-27T11:00:00",
"last_seen": "2025-12-27T11:10:00",
"sample_reading": {
"temperature": 19.8,
"humidity": 52,
"battery_percent": 65,
"battery_voltage": 2800
},
"status": "ignored",
"ignored_at": "2025-12-27T11:15:00",
"ignore_reason": "Test sensor, not needed"
}
]

View File

@@ -1,29 +0,0 @@
# MQTT Configuration
MQTT_HOST=192.168.0.114
MQTT_PORT=1883
MQTT_USER=hasse
MQTT_PASSWORD=casablanca
MQTT_CLIENT_ID=mibridge
# Sensor Configuration
# For system installation (/opt/sensorpajen): Use absolute paths
# SENSOR_CONFIG_FILE=/etc/sensorpajen/sensors.json
# DISCOVERED_SENSORS_FILE=/etc/sensorpajen/discovered_sensors.json
#
# For development installation: Use relative paths (from project root)
# SENSOR_CONFIG_FILE=config/sensors.json
# DISCOVERED_SENSORS_FILE=config/discovered_sensors.json
#
# If not set, defaults will be used based on installation type
# Application Settings
WATCHDOG_TIMEOUT=5
ENABLE_BATTERY=true
LOG_LEVEL=INFO
CONFIG_RELOAD_INTERVAL=900
# ntfy Notifications (optional)
NTFY_ENABLED=false
NTFY_URL=https://ntfy.sh
NTFY_TOPIC=sensorpajen
NTFY_TOKEN=

View File

@@ -1,37 +0,0 @@
{
"sensors": [
{
"mac": "A4:C1:38:98:7B:B6",
"name": "mi_temp_1",
"comment": "Example sensor - replace with your sensors"
},
{
"mac": "A4:C1:38:29:03:0D",
"name": "mi_temp_2"
},
{
"mac": "A4:C1:38:62:CA:83",
"name": "mi_temp_3"
},
{
"mac": "A4:C1:38:D5:EA:63",
"name": "mi_temp_4"
},
{
"mac": "A4:C1:38:7C:9C:63",
"name": "mi_temp_5"
},
{
"mac": "A4:C1:38:68:2C:DA",
"name": "mi_temp_6"
},
{
"mac": "A4:C1:38:AD:74:2B",
"name": "mi_temp_7"
},
{
"mac": "A4:C1:38:46:9F:D1",
"name": "mi_temp_8"
}
]
}

View File

@@ -4,9 +4,9 @@ build-backend = "setuptools.build_meta"
[project]
name = "sensorpajen"
version = "2.0.0"
version = "3.0.0"
description = "Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC"
readme = "README.md"
readme = "readme.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [

View File

@@ -28,10 +28,10 @@ The easiest way to install on Raspberry Pi OS is using the pre-built Debian pack
```bash
# Download the latest release
wget https://gitea.wahlberg.se/api/v1/repos/fredrik/sensorpajen/releases/download/v2.0.0/sensorpajen_2.0.0_all.deb
wget https://gitea.wahlberg.se/api/v1/repos/fredrik/sensorpajen/releases/download/v3.0.0/sensorpajen_3.0.0_all.deb
# Install
sudo dpkg -i sensorpajen_2.0.0_all.deb
sudo dpkg -i sensorpajen_3.0.0_all.deb
# Configure
sudo nano /etc/sensorpajen/sensorpajen.env # Edit MQTT settings
@@ -82,7 +82,8 @@ The TUI allows you to:
**Keybindings:**
- `a`: Approve selected sensor
- `i`: Ignore selected sensor
- `e`: Edit sensor name
- `e`: Edit sensor name and comment
- `v`: View details (MAC/name/comment)
- `u`: Unignore sensor
- `Delete`: Remove sensor from monitoring
- `r`: Refresh data
@@ -90,17 +91,17 @@ The TUI allows you to:
When you approve a sensor, it's added to your configuration and the service automatically starts monitoring it.
### Legacy CLI Approval
### Legacy CLI Approval (Deprecated)
If you prefer the command line, you can still use:
The recommended workflow is the TUI (`sensorpajen-tui`). A legacy CLI tool still exists:
```bash
sudo sensorpajen approve-sensors
sudo sensorpajen-approve-sensors
```
### MQTT Settings
Edit `config/sensorpajen.env`:
Edit `/etc/sensorpajen/sensorpajen.env`:
```bash
MQTT_HOST=192.168.1.10
@@ -128,6 +129,7 @@ Sensors are automatically managed via the approval workflow. You can also manual
}
]
}
```
## Service Management
@@ -227,11 +229,11 @@ mosquitto_sub -h <MQTT_HOST> -u <USER> -P <PASSWORD> -t "MiTemperature2/#" -v
**Sensor not found:**
```bash
# Run sensor discovery
sudo sensorpajen approve-sensors
# Run the TUI to view/approve newly discovered sensors
sudo sensorpajen-tui
# Check discovered sensors
sudo cat /var/lib/sensorpajen/discovered_sensors.json | jq '.'
# Check recent logs
sudo journalctl -u sensorpajen -n 100
```
### Development Installation

View File

@@ -49,6 +49,7 @@ PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
# 1. Sync Code
log "Syncing code from $PROJECT_ROOT to $REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR..."
rsync -avz --exclude '.venv' --exclude '__pycache__' --exclude '*.egg-info' \
--exclude '*.db' --exclude '*.db-*' --exclude '*.sqlite' --exclude '*.sqlite-*' \
"$PROJECT_ROOT/src" "$PROJECT_ROOT/scripts" "$PROJECT_ROOT/pyproject.toml" "$PROJECT_ROOT/config" \
"$REMOTE_USER@$REMOTE_HOST:$REMOTE_DIR/"
@@ -127,11 +128,17 @@ ssh -t $REMOTE_USER@$REMOTE_HOST "
fi
# ALWAYS sanitize sensorpajen.env to ensure we don't use system paths
# and set explicit dev paths
if [ -f config/sensorpajen.env ]; then
echo 'Sanitizing config/sensorpajen.env...'
echo 'Sanitizing and setting dev paths in config/sensorpajen.env...'
sudo sed -i '/^SENSOR_CONFIG_FILE/d' config/sensorpajen.env
sudo sed -i '/^DATABASE_FILE/d' config/sensorpajen.env
sudo sed -i '/^DISCOVERED_SENSORS_FILE/d' config/sensorpajen.env
# Add dev paths explicitly (use absolute paths since we're in ssh context)
echo "SENSOR_CONFIG_FILE=/home/$REMOTE_USER/sensorpajen-dev/config/sensors.json" | sudo tee -a config/sensorpajen.env > /dev/null
echo "DATABASE_FILE=/home/$REMOTE_USER/sensorpajen-dev/config/sensorpajen.db" | sudo tee -a config/sensorpajen.env > /dev/null
echo "DISCOVERED_SENSORS_FILE=/home/$REMOTE_USER/sensorpajen-dev/config/discovered_sensors.json" | sudo tee -a config/sensorpajen.env > /dev/null
fi
# Examples (if real config missing)
@@ -191,6 +198,7 @@ ssh -t $REMOTE_USER@$REMOTE_HOST "
source config/sensorpajen.env
set +a
fi
export TUI_LOG_FILE=dev_backend.log
# Run TUI
python3 -m sensorpajen.tui.app
"

View File

@@ -136,7 +136,7 @@ echo ""
# Show package contents
echo "Package contents:"
echo "======================================================================"
dpkg-deb -c "$DEB_FILE" | head -20
dpkg-deb -c "$DEB_FILE" | sed -n '1,20p'
TOTAL_FILES=$(dpkg-deb -c "$DEB_FILE" | wc -l)
if [ $TOTAL_FILES -gt 20 ]; then
echo "... and $(($TOTAL_FILES - 20)) more files"

View File

@@ -5,6 +5,6 @@ Monitors Xiaomi Mijia LYWSD03MMC Bluetooth temperature sensors
and publishes data to MQTT broker.
"""
__version__ = "2.0.0-dev"
__version__ = "3.0.0"
__author__ = "Fredrik"
__license__ = "MIT"

View File

@@ -100,6 +100,7 @@ class SensorConfig:
"""
self.config_file = Path(config_file)
self.sensors: Dict[str, str] = {}
self.comments: Dict[str, str] = {}
self.load()
def load(self):
@@ -119,9 +120,12 @@ class SensorConfig:
for sensor in data.get('sensors', []):
mac = sensor.get('mac', '').upper()
name = sensor.get('name')
comment = sensor.get('comment')
if mac and name:
self.sensors[mac] = name
if isinstance(comment, str) and comment != "":
self.comments[mac] = comment
logger.debug(f"Loaded sensor: {mac} -> {name}")
logger.info(f"Loaded {len(self.sensors)} sensors from {self.config_file}")
@@ -147,6 +151,10 @@ class SensorConfig:
"""Get list of all configured MAC addresses."""
return list(self.sensors.keys())
def get_comment(self, mac: str) -> Optional[str]:
"""Get sensor comment by MAC address, if present."""
return self.comments.get(mac.upper())
def add_sensor(self, mac: str, name: str, comment: Optional[str] = None):
"""
Add or update a sensor in the configuration.
@@ -157,8 +165,25 @@ class SensorConfig:
comment: Optional comment
"""
mac = mac.upper()
logger.debug(f"add_sensor called: MAC={mac}, name={name}")
self.sensors[mac] = name
if comment is not None:
# Allow explicit clearing by passing empty string
if comment == "":
self.comments.pop(mac, None)
else:
self.comments[mac] = comment
logger.debug(f"Updated in-memory dict: {mac} -> {name}")
logger.debug(f"Current sensors dict: {self.sensors}")
try:
self.save(mac, name, comment)
logger.info(f"Successfully saved sensor {mac}={name}")
except Exception as e:
# If save fails, remove from memory too
logger.error(f"Failed to save sensor {mac}: {e}")
if mac in self.sensors:
del self.sensors[mac]
raise e
def remove_sensor(self, mac: str):
"""
@@ -170,6 +195,7 @@ class SensorConfig:
mac = mac.upper()
if mac in self.sensors:
del self.sensors[mac]
self.comments.pop(mac, None)
# Load current file, remove entry, and save
try:
@@ -196,27 +222,37 @@ class SensorConfig:
comment: Optional comment
"""
mac = mac.upper()
logger.debug(f"save() called for MAC={mac}, name={name}")
data = {"sensors": []}
try:
if self.config_file.exists():
logger.debug(f"Reading existing config from {self.config_file}")
with open(self.config_file, 'r') as f:
data = json.load(f)
logger.debug(f"Loaded config with {len(data.get('sensors', []))} sensors")
else:
logger.debug(f"Config file does not exist: {self.config_file}")
sensors = data.get('sensors', [])
# Update existing or add new
found = False
for s in sensors:
if s.get('mac', '').upper() == mac:
logger.debug(f"Found existing sensor entry for {mac}, updating name")
s['name'] = name
if comment:
if comment is not None:
if comment == "":
s.pop('comment', None)
else:
s['comment'] = comment
found = True
break
if not found:
logger.debug(f"Sensor {mac} not found in config, adding new entry")
new_sensor = {"mac": mac, "name": name}
if comment:
if comment is not None and comment != "":
new_sensor["comment"] = comment
sensors.append(new_sensor)
@@ -225,12 +261,22 @@ class SensorConfig:
# Ensure directory exists
self.config_file.parent.mkdir(parents=True, exist_ok=True)
logger.debug(f"Writing {len(sensors)} sensors to {self.config_file}")
with open(self.config_file, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved sensor {mac} to {self.config_file}")
# Verify the write
with open(self.config_file, 'r') as f:
saved_data = json.load(f)
saved_sensors = saved_data.get('sensors', [])
logger.debug(f"Verification: File now contains {len(saved_sensors)} sensors")
for s in saved_sensors:
if s.get('mac', '').upper() == mac:
logger.debug(f"Verification: Found {mac} in file with name={s.get('name')}")
except Exception as e:
logger.error(f"Error saving sensor config: {e}")
logger.error(f"Error saving sensor config: {e}", exc_info=True)
raise e

View File

@@ -178,7 +178,7 @@ class DiscoveryManager:
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C\n"
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%\n"
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%\n\n"
f"Run 'sensorpajen approve-sensors' to approve or ignore."
f"Run 'sensorpajen-tui' to approve or ignore."
)
url = f"{config.NTFY_URL}/{config.NTFY_TOPIC}"

View File

@@ -127,7 +127,7 @@ class Sensorpajen:
if len(self.sensor_config.sensors) == 0:
self.logger.warning("No sensors configured")
self.logger.warning("Starting in discovery-only mode")
self.logger.warning("Use 'sensorpajen approve-sensors' to add sensors")
self.logger.warning("Use 'sensorpajen-tui' to add sensors")
# Initialize discovery manager
self.logger.info("Initializing discovery manager...")

View File

@@ -1,11 +1,62 @@
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.widgets import Header, Footer, TabbedContent, TabPane, DataTable, Static, Button
from textual.containers import Container, Horizontal
from textual import on
import asyncio
import logging
import os
from pathlib import Path
from typing import Callable, Optional
from ..discovery_manager import DiscoveryManager
from ..config import SensorConfig, save_env_var
from .modals import InputModal
from .modals import InputModal, ConfirmModal, DetailsModal, EditSensorModal
def _format_metadata_comment(mac: str, name: str, last_seen: str, sample_reading: dict) -> str:
return (
f"MAC: {mac}, "
f"Name: {name}, "
f"Last seen: {last_seen}, "
f"Temp: {sample_reading.get('temperature', 'N/A')}°C, "
f"Humidity: {sample_reading.get('humidity', 'N/A')}%, "
f"Battery: {sample_reading.get('battery_percent', 'N/A')}%"
)
def _setup_tui_file_logging() -> logging.Logger:
"""Log to a file (not stdout/stderr) to avoid breaking Textual fullscreen UI."""
logger = logging.getLogger("sensorpajen.tui")
if logger.handlers:
return logger
log_file = Path(os.environ.get("TUI_LOG_FILE", "dev_backend.log"))
if not log_file.is_absolute():
log_file = Path.cwd() / log_file
try:
handler = logging.FileHandler(log_file, mode="a", encoding="utf-8")
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.propagate = False
logger.info("TUI logging enabled -> %s", log_file)
except Exception as exc:
# If we can't write logs, keep UI working.
try:
logger.setLevel(logging.CRITICAL)
logger.propagate = False
except Exception:
pass
return logger
tui_logger = _setup_tui_file_logging()
class SensorpajenApp(App):
"""A Textual app to manage Bluetooth sensors."""
@@ -65,15 +116,17 @@ class SensorpajenApp(App):
}
"""
# Use priority bindings so keys still reach the App even when a DataTable has focus.
BINDINGS = [
("q", "quit", "Quit"),
("d", "toggle_dark", "Toggle dark mode"),
("r", "refresh", "Refresh data"),
("a", "approve", "Approve"),
("i", "ignore", "Ignore"),
("e", "edit", "Edit"),
("u", "unignore", "Unignore"),
("delete", "remove", "Remove"),
Binding("q", "quit", "Quit", priority=True),
Binding("d", "toggle_dark", "Toggle dark mode", priority=True),
Binding("r", "refresh", "Refresh data", priority=True),
Binding("a", "approve", "Approve", priority=True),
Binding("i", "ignore", "Ignore", priority=True),
Binding("e", "edit", "Edit", priority=True),
Binding("v", "view_details", "Details", priority=True),
Binding("u", "unignore", "Unignore", priority=True),
Binding("delete", "remove", "Remove", priority=True),
]
def __init__(self, **kwargs):
@@ -81,6 +134,14 @@ class SensorpajenApp(App):
self.sensor_config = SensorConfig()
# Pass sensor_config to discovery manager for filtering
self.discovery_manager = DiscoveryManager(sensor_config=self.sensor_config)
try:
tui_logger.info(
"TUI init: sensors_file=%s, configured=%d",
getattr(self.sensor_config, "config_file", None),
len(getattr(self.sensor_config, "sensors", {})),
)
except Exception:
pass
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
@@ -136,6 +197,50 @@ class SensorpajenApp(App):
"""Handle app mount event."""
self.refresh_data()
def _open_input_modal(
self,
title: str,
*,
initial_value: str = "",
placeholder: str = "",
on_result: Callable[[Optional[str]], None],
) -> None:
"""Open an input modal and handle the result via callback.
This avoids awaiting modal results inside action handlers, which can
freeze / deadlock depending on Textual version and context.
"""
modal = InputModal(title, placeholder=placeholder, initial_value=initial_value)
self.push_screen(modal, on_result)
def _open_confirm_modal(
self,
title: str,
message: str,
*,
on_result: Callable[[bool], None],
confirm_label: str = "Yes",
cancel_label: str = "No",
) -> None:
modal = ConfirmModal(
title,
message,
confirm_label=confirm_label,
cancel_label=cancel_label,
)
self.push_screen(modal, on_result)
def _open_details_modal(self, title: str, details_text: str) -> None:
modal = DetailsModal(title, details_text)
# No callback needed
self.push_screen(modal)
async def _save_sensor(self, mac: str, name: str, comment: Optional[str] = None) -> None:
await asyncio.to_thread(self.sensor_config.add_sensor, mac, name, comment)
async def _remove_sensor(self, mac: str) -> None:
await asyncio.to_thread(self.sensor_config.remove_sensor, mac)
def action_refresh(self) -> None:
"""Refresh all tables."""
self.refresh_data()
@@ -153,16 +258,58 @@ class SensorpajenApp(App):
mac = row[0]
default_name = row[1]
name = await self.push_screen(InputModal("Enter sensor name", initial_value=default_name))
if name:
# Get a richer sensor object for metadata (best-effort)
sensor_obj = None
try:
self.sensor_config.add_sensor(mac, name)
self.discovery_manager.approve(mac)
self.notify(f"Approved {mac} as {name}")
for s in self.discovery_manager.get_pending():
if getattr(s, "mac", "").upper() == str(mac).upper():
sensor_obj = s
break
except Exception:
sensor_obj = None
default_comment = _format_metadata_comment(
str(mac),
getattr(sensor_obj, "name", default_name),
getattr(sensor_obj, "last_seen", "N/A"),
getattr(sensor_obj, "sample_reading", {}) or {},
)
def _on_result(result: object) -> None:
if result is None:
return
try:
name, comment = result # type: ignore[misc]
except Exception:
self.notify("Invalid approve result", severity="error")
return
name_stripped = str(name).strip()
if not name_stripped:
self.notify("Sensor name cannot be empty", severity="error")
return
# Match legacy behavior: empty comment falls back to default metadata.
comment_stripped = str(comment).strip()
comment_to_use = comment_stripped if comment_stripped else default_comment
async def _do() -> None:
try:
await self._save_sensor(str(mac), name_stripped, comment_to_use)
self.discovery_manager.approve(str(mac))
self.notify(f"Approved {mac} as {name_stripped}")
self.refresh_data()
except Exception as e:
self.notify(f"Error approving sensor: {e}", severity="error")
asyncio.create_task(_do())
self.push_screen(
EditSensorModal(title="Approve sensor", name=str(default_name), comment=default_comment),
_on_result,
)
async def action_ignore(self) -> None:
"""Ignore the selected discovered sensor."""
if self.query_one(TabbedContent).active != "discovery":
@@ -175,8 +322,10 @@ class SensorpajenApp(App):
row = table.get_row_at(table.cursor_row)
mac = row[0]
reason = await self.push_screen(InputModal("Enter ignore reason (optional)"))
if reason is not None: # Allow empty string but not None (Cancel)
def _on_reason(reason: Optional[str]) -> None:
# Allow empty string but not None (Cancel)
if reason is None:
return
try:
self.discovery_manager.ignore(mac, reason if reason else None)
self.notify(f"Ignored {mac}")
@@ -184,6 +333,8 @@ class SensorpajenApp(App):
except Exception as e:
self.notify(f"Error ignoring sensor: {e}", severity="error")
self._open_input_modal("Enter ignore reason (optional)", on_result=_on_reason)
async def action_edit(self) -> None:
"""Edit the selected item (sensor or setting)."""
active_tab = self.query_one(TabbedContent).active
@@ -194,18 +345,91 @@ class SensorpajenApp(App):
return
row = table.get_row_at(table.cursor_row)
mac = row[0]
current_name = row[1]
mac = str(row[0]).upper() # Ensure MAC is uppercase
current_name = str(row[1])
current_comment = self.sensor_config.get_comment(mac) or ""
name = await self.push_screen(InputModal("Edit sensor name", initial_value=current_name))
if name:
try:
self.sensor_config.add_sensor(mac, name)
self.notify(f"Updated {mac} to {name}")
tui_logger.info(
"Edit configured: mac=%s current_name=%r file=%s",
mac,
current_name,
getattr(self.sensor_config, "config_file", None),
)
except Exception:
pass
current_name_stripped = current_name.strip()
current_comment_stripped = current_comment.strip()
def _on_result(result: object) -> None:
if result is None:
try:
tui_logger.info("Edit cancelled: mac=%s", mac)
except Exception:
pass
return
try:
new_name, new_comment = result # type: ignore[misc]
except Exception:
self.notify("Invalid edit result", severity="error")
return
new_name = str(new_name).strip()
new_comment = str(new_comment).strip()
if not new_name:
self.notify("Sensor name cannot be empty", severity="error")
return
name_changed = new_name != current_name_stripped
comment_changed = new_comment != current_comment_stripped
if not name_changed and not comment_changed:
try:
tui_logger.info("Edit no-op: mac=%s name/comment unchanged", mac)
except Exception:
pass
return
# Only touch comment if user changed it; empty string means clear.
comment_to_pass: Optional[str]
if comment_changed:
comment_to_pass = new_comment
else:
comment_to_pass = None
async def _do() -> None:
try:
await self._save_sensor(mac, new_name, comment_to_pass)
stored_name = self.sensor_config.sensors.get(mac)
try:
tui_logger.info(
"Edit result: mac=%s new_name=%r stored_name=%r new_comment=%r",
mac,
new_name,
stored_name,
comment_to_pass,
)
except Exception:
pass
self.notify(f"Updated {mac}")
self.refresh_data()
except Exception as e:
try:
tui_logger.exception("Error updating sensor: mac=%s", mac)
except Exception:
pass
self.notify(f"Error updating sensor: {e}", severity="error")
asyncio.create_task(_do())
self.push_screen(
EditSensorModal(name=current_name, comment=current_comment),
_on_result,
)
elif active_tab == "settings":
table = self.query_one("#settings-table", DataTable)
if table.cursor_row is None:
@@ -215,18 +439,63 @@ class SensorpajenApp(App):
key = row[0]
current_value = row[1]
new_value = await self.push_screen(InputModal(f"Edit {key}", initial_value=str(current_value)))
if new_value is not None:
def _on_value(new_value: Optional[str]) -> None:
if new_value is None:
return
try:
save_env_var(key, new_value)
self.notify(f"Updated {key}. Restart required!", severity="warning")
# Temporarily update the view although it won't take effect until restart
# Update current runtime env for display (won't take effect in backend until restart)
import os
os.environ[key] = new_value # Update current runtime env for display
os.environ[key] = new_value
self.notify(
f"Updated {key}. Restart service for changes to take effect!",
severity="warning",
)
self.refresh_data()
except Exception as e:
self.notify(f"Error saving setting: {e}", severity="error")
self._open_input_modal(f"Edit {key}", initial_value=str(current_value), on_result=_on_value)
def action_view_details(self) -> None:
"""View details for the selected sensor (shows long comment in popup)."""
active_tab = self.query_one(TabbedContent).active
table_id = None
if active_tab == "configured":
table_id = "#configured-table"
elif active_tab == "discovery":
table_id = "#discovery-table"
elif active_tab == "ignored":
table_id = "#ignored-table"
else:
return
table = self.query_one(table_id, DataTable)
if table.cursor_row is None:
self.notify("Select a sensor first", severity="warning")
return
row = table.get_row_at(table.cursor_row)
mac = str(row[0]).upper()
name = str(row[1]) if len(row) > 1 else ""
comment = None
if active_tab == "configured":
comment = self.sensor_config.get_comment(mac)
details_lines = [
f"MAC: {mac}",
f"Name: {name}",
]
if comment:
details_lines.extend(["", "Comment:", comment])
else:
details_lines.extend(["", "Comment:", "(none)"])
self._open_details_modal("Sensor details", "\n".join(details_lines))
def action_remove(self) -> None:
"""Remove the selected configured sensor."""
if self.query_one(TabbedContent).active != "configured":
@@ -239,17 +508,32 @@ class SensorpajenApp(App):
row = table.get_row_at(table.cursor_row)
mac = row[0]
def _on_confirm(confirmed: bool) -> None:
if not confirmed:
return
async def _do() -> None:
try:
self.sensor_config.remove_sensor(mac)
await self._remove_sensor(str(mac))
# Also need to reset its status in DiscoveryManager to make it show up in Discovery again
self.discovery_manager.unignore(mac) # unignore sets status to 'pending'
self.discovery_manager.unignore(str(mac)) # unignore sets status to 'pending'
self.notify(f"Removed {mac}")
self.refresh_data()
except Exception as e:
self.notify(f"Error removing sensor: {e}", severity="error")
asyncio.create_task(_do())
self._open_confirm_modal(
"Remove sensor",
f"Remove {mac} from configured sensors?",
confirm_label="Remove",
cancel_label="Cancel",
on_result=_on_confirm,
)
def action_unignore(self) -> None:
"""Unignore the selected sensor."""
if self.query_one(TabbedContent).active != "ignored":
@@ -268,11 +552,16 @@ class SensorpajenApp(App):
def refresh_data(self) -> None:
"""Load data from managers and update tables."""
try:
self._update_discovery_table()
self._update_configured_table()
self._update_ignored_table()
self._update_settings_table()
self._update_dashboard()
except Exception as e:
self.notify(f"Error refreshing data: {e}", severity="error")
import traceback
traceback.print_exc()
def _update_discovery_table(self) -> None:
table = self.query_one("#discovery-table", DataTable)

View File

@@ -1,7 +1,9 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.binding import Binding
from textual.widgets import Input, Label, Button
from textual.containers import Vertical, Horizontal
from textual.containers import Vertical, Horizontal, VerticalScroll
from textual.widgets import Static
class InputModal(ModalScreen[str]):
"""A modal screen for text input."""
@@ -31,3 +33,110 @@ class InputModal(ModalScreen[str]):
def on_input_submitted(self, event: Input.Submitted) -> None:
self.dismiss(event.value)
class ConfirmModal(ModalScreen[bool]):
"""A modal screen for confirming an action."""
def __init__(
self,
title: str,
message: str,
*,
confirm_label: str = "Yes",
cancel_label: str = "No",
):
super().__init__()
self.title_text = title
self.message_text = message
self.confirm_label = confirm_label
self.cancel_label = cancel_label
def compose(self) -> ComposeResult:
with Vertical(id="modal-container"):
yield Label(self.title_text)
yield Label(self.message_text)
with Horizontal(id="modal-buttons"):
yield Button(self.confirm_label, variant="warning", id="confirm-btn")
yield Button(self.cancel_label, variant="primary", id="cancel-btn")
def on_mount(self) -> None:
self.query_one("#cancel-btn", Button).focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "confirm-btn":
self.dismiss(True)
else:
self.dismiss(False)
class DetailsModal(ModalScreen[None]):
"""A modal screen for showing potentially long details text."""
BINDINGS = [
Binding("escape", "close", show=False),
]
def __init__(self, title: str, details_text: str):
super().__init__()
self.title_text = title
self.details_text = details_text
def compose(self) -> ComposeResult:
with Vertical(id="modal-container"):
yield Label(self.title_text)
with VerticalScroll():
yield Static(self.details_text)
with Horizontal(id="modal-buttons"):
yield Button("Close", variant="primary", id="close-btn")
def action_close(self) -> None:
self.dismiss(None)
def on_mount(self) -> None:
self.query_one("#close-btn", Button).focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
self.dismiss(None)
class EditSensorModal(ModalScreen):
"""A modal screen for editing a sensor's name and comment."""
def __init__(self, *, title: str = "Edit sensor", name: str, comment: str):
super().__init__()
self.title_text = title
self.initial_name = name
self.initial_comment = comment
def compose(self) -> ComposeResult:
with Vertical(id="modal-container"):
yield Label(self.title_text)
yield Label("Name")
yield Input(value=self.initial_name, id="sensor-name-input")
yield Label("Comment")
yield Input(value=self.initial_comment, placeholder="Optional comment", id="sensor-comment-input")
with Horizontal(id="modal-buttons"):
yield Button("OK", variant="primary", id="ok-btn")
yield Button("Cancel", variant="error", id="cancel-btn")
def on_mount(self) -> None:
self.query_one("#sensor-name-input", Input).focus()
def _dismiss_with_values(self) -> None:
name = self.query_one("#sensor-name-input", Input).value
comment = self.query_one("#sensor-comment-input", Input).value
self.dismiss((name, comment))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok-btn":
self._dismiss_with_values()
else:
self.dismiss(None)
def on_input_submitted(self, event: Input.Submitted) -> None:
# Enter on name moves to comment; Enter on comment submits.
if event.input.id == "sensor-name-input":
self.query_one("#sensor-comment-input", Input).focus()
else:
self._dismiss_with_values()

View File

@@ -34,6 +34,35 @@ def test_sensor_config_load(tmp_path):
assert sensor_cfg.get_name("AA:BB:CC:DD:EE:FF") == "Living Room"
assert sensor_cfg.get_name("UNKNOWN") == "UNKNOWN"
def test_sensor_config_comment_load_and_clear(tmp_path):
import json
import sensorpajen.config as config
config_file = tmp_path / "sensors.json"
config_file.write_text(
json.dumps(
{
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Living Room", "comment": "hello"},
]
},
indent=2,
)
)
sensor_cfg = config.SensorConfig(config_file=str(config_file))
assert sensor_cfg.get_comment("AA:BB:CC:DD:EE:FF") == "hello"
# Clear comment explicitly (empty string means remove comment key)
sensor_cfg.add_sensor("AA:BB:CC:DD:EE:FF", "Living Room", "")
assert sensor_cfg.get_comment("AA:BB:CC:DD:EE:FF") is None
saved = json.loads(config_file.read_text())
assert saved["sensors"][0]["mac"] == "AA:BB:CC:DD:EE:FF"
assert saved["sensors"][0]["name"] == "Living Room"
assert "comment" not in saved["sensors"][0]
def test_sensor_config_add_remove(tmp_path):
import sensorpajen.config as config
config_file = tmp_path / "sensors.json"
@@ -47,10 +76,12 @@ def test_sensor_config_add_remove(tmp_path):
# Add
sensor_cfg.add_sensor("AA:BB:CC:DD:EE:FF", "Living Room", "Test comment")
assert sensor_cfg.sensors["AA:BB:CC:DD:EE:FF"] == "Living Room"
assert sensor_cfg.get_comment("AA:BB:CC:DD:EE:FF") == "Test comment"
# Verify persistence
sensor_cfg2 = config.SensorConfig(config_file=str(config_file))
assert sensor_cfg2.sensors["AA:BB:CC:DD:EE:FF"] == "Living Room"
assert sensor_cfg2.get_comment("AA:BB:CC:DD:EE:FF") == "Test comment"
# Remove
sensor_cfg.remove_sensor("AA:BB:CC:DD:EE:FF")

View File

@@ -3,6 +3,12 @@ import os
from pathlib import Path
from sensorpajen.discovery_manager import DiscoveryManager, DiscoveredSensor
class _DummyCompletedProcess:
def __init__(self, returncode: int = 0, stderr: bytes = b""):
self.returncode = returncode
self.stderr = stderr
def test_discovery_manager_init(tmp_path):
db_file = tmp_path / "sensors.db"
manager = DiscoveryManager(str(db_file))
@@ -55,3 +61,94 @@ def test_discovery_manager_persistence(tmp_path):
assert len(pending) == 1
assert pending[0].mac == mac
assert pending[0].name == "ATC_123456"
def test_send_ntfy_notification_disabled(monkeypatch, tmp_path):
from sensorpajen import discovery_manager as dm_mod
monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", False)
monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "token")
called = {"run": False}
def _fake_run(*args, **kwargs):
called["run"] = True
return _DummyCompletedProcess(0)
monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run)
manager = dm_mod.DiscoveryManager(str(tmp_path / "dummy.db"))
sensor = dm_mod.DiscoveredSensor(
mac="AA",
name="N",
rssi=-1,
first_seen="now",
last_seen="now",
sample_reading={"temperature": 1, "humidity": 2, "battery_percent": 3},
)
manager.send_ntfy_notification(sensor)
assert called["run"] is False
def test_send_ntfy_notification_missing_token(monkeypatch, tmp_path):
from sensorpajen import discovery_manager as dm_mod
monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", True)
monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "")
called = {"run": False}
def _fake_run(*args, **kwargs):
called["run"] = True
return _DummyCompletedProcess(0)
monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run)
manager = dm_mod.DiscoveryManager(str(tmp_path / "dummy2.db"))
sensor = dm_mod.DiscoveredSensor(
mac="AA",
name="N",
rssi=-1,
first_seen="now",
last_seen="now",
sample_reading={"temperature": 1, "humidity": 2, "battery_percent": 3},
)
manager.send_ntfy_notification(sensor)
assert called["run"] is False
def test_send_ntfy_notification_message_mentions_tui(monkeypatch, tmp_path):
from sensorpajen import discovery_manager as dm_mod
monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", True)
monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "token")
monkeypatch.setattr(dm_mod.config, "NTFY_URL", "https://ntfy.sh")
monkeypatch.setattr(dm_mod.config, "NTFY_TOPIC", "sensorpajen")
captured = {"args": None}
def _fake_run(args, capture_output=True, timeout=10):
captured["args"] = args
return _DummyCompletedProcess(0)
monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run)
manager = dm_mod.DiscoveryManager(str(tmp_path / "sensors.db"))
sensor = dm_mod.DiscoveredSensor(
mac="AA:BB",
name="ATC_123",
rssi=-1,
first_seen="2025-01-01T00:00:00",
last_seen="2025-01-01T00:00:00",
sample_reading={"temperature": 10, "humidity": 20, "battery_percent": 30},
)
manager.send_ntfy_notification(sensor)
assert captured["args"] is not None
# curl args: [..., "-d", message, url]
assert "-d" in captured["args"]
message = captured["args"][captured["args"].index("-d") + 1]
assert "sensorpajen-tui" in message

View File

@@ -1,8 +1,225 @@
import pytest
from sensorpajen.tui.app import SensorpajenApp
import tempfile
import json
import sqlite3
from pathlib import Path
from datetime import datetime
from sensorpajen.config import SensorConfig
from sensorpajen.discovery_manager import DiscoveryManager
def test_tui_app_init():
# Just test that we can instantiate it
app = SensorpajenApp()
assert app.discovery_manager is not None
assert app.sensor_config is not None
def test_tui_sensor_config_edit():
"""Integration test: Test that editing a sensor works end-to-end"""
with tempfile.TemporaryDirectory() as tmpdir:
config_file = Path(tmpdir) / "sensors.json"
db_file = Path(tmpdir) / "test.db"
# Create initial config
initial_data = {
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Living Room Sensor"}
]
}
config_file.write_text(json.dumps(initial_data, indent=2))
# Initialize database
conn = sqlite3.connect(str(db_file))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS discovered_sensors (
mac TEXT PRIMARY KEY,
name TEXT,
rssi INTEGER,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
count INTEGER DEFAULT 0,
last_temp REAL,
last_humidity REAL,
last_battery_percent INTEGER,
last_battery_voltage INTEGER,
status TEXT DEFAULT 'pending',
reviewed BOOLEAN DEFAULT 0,
ignored_at TIMESTAMP,
ignore_reason TEXT
)
""")
now = datetime.now().isoformat()
cursor.execute("""
INSERT INTO discovered_sensors
(mac, name, rssi, first_seen, last_seen, count, last_temp, last_humidity,
last_battery_percent, last_battery_voltage, status, reviewed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'approved', 1)
""", ("AA:BB:CC:DD:EE:FF", "Living Room Sensor", -65, now, now, 50, 23.5, 55, 85, 2950))
conn.commit()
conn.close()
# Load config and discovery manager (simulating TUI)
config = SensorConfig(str(config_file))
dm = DiscoveryManager(str(db_file), config)
# Verify initial state
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Living Room Sensor"
# Edit sensor (simulate user action in TUI)
config.add_sensor("AA:BB:CC:DD:EE:FF", "Bedroom Sensor")
# Verify in-memory update
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Bedroom Sensor"
# Verify disk update
saved_data = json.loads(config_file.read_text())
assert saved_data["sensors"][0]["name"] == "Bedroom Sensor"
# Simulate refresh_data() - create new config instance and verify
config2 = SensorConfig(str(config_file))
assert config2.sensors["AA:BB:CC:DD:EE:FF"] == "Bedroom Sensor"
def test_sensor_config_edit_updates_memory():
"""Test that editing a sensor updates both disk and memory"""
with tempfile.TemporaryDirectory() as tmpdir:
config_file = Path(tmpdir) / "sensors.json"
# Create initial config
initial_data = {
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Original Name"}
]
}
config_file.write_text(json.dumps(initial_data, indent=2))
# Load config
config = SensorConfig(str(config_file))
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Original Name"
# Edit sensor
config.add_sensor("AA:BB:CC:DD:EE:FF", "Updated Name")
# Check in-memory is updated
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Updated Name"
# Check disk is updated
saved_data = json.loads(config_file.read_text())
assert saved_data["sensors"][0]["name"] == "Updated Name"
# Reload from disk and verify
config2 = SensorConfig(str(config_file))
assert config2.sensors["AA:BB:CC:DD:EE:FF"] == "Updated Name"
def test_sensor_config_remove_sensor():
"""Test that removing a sensor works correctly"""
with tempfile.TemporaryDirectory() as tmpdir:
config_file = Path(tmpdir) / "sensors.json"
# Create config with multiple sensors
initial_data = {
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Sensor 1"},
{"mac": "AA:BB:CC:DD:EE:11", "name": "Sensor 2"}
]
}
config_file.write_text(json.dumps(initial_data, indent=2))
# Load and verify
config = SensorConfig(str(config_file))
assert len(config.sensors) == 2
# Remove one sensor
config.remove_sensor("AA:BB:CC:DD:EE:FF")
# Check in-memory removal
assert "AA:BB:CC:DD:EE:FF" not in config.sensors
assert "AA:BB:CC:DD:EE:11" in config.sensors
# Check disk update
saved_data = json.loads(config_file.read_text())
assert len(saved_data["sensors"]) == 1
assert saved_data["sensors"][0]["mac"] == "AA:BB:CC:DD:EE:11"
def test_sensor_config_reload():
"""Test that reload() re-reads from disk"""
with tempfile.TemporaryDirectory() as tmpdir:
config_file = Path(tmpdir) / "sensors.json"
# Create initial config
initial_data = {
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Original Name"}
]
}
config_file.write_text(json.dumps(initial_data, indent=2))
# Load config
config = SensorConfig(str(config_file))
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Original Name"
# Manually modify file on disk
new_data = {
"sensors": [
{"mac": "AA:BB:CC:DD:EE:FF", "name": "Externally Modified"}
]
}
config_file.write_text(json.dumps(new_data, indent=2))
# Reload should pick up the changes
config.load()
assert config.sensors["AA:BB:CC:DD:EE:FF"] == "Externally Modified"
def test_discovery_manager_approve_sensor():
"""Test that approving a sensor works correctly"""
with tempfile.TemporaryDirectory() as tmpdir:
config_file = Path(tmpdir) / "sensors.json"
db_file = Path(tmpdir) / "test.db"
# Create empty config
config_file.write_text(json.dumps({"sensors": []}, indent=2))
# Initialize database with pending sensor
conn = sqlite3.connect(str(db_file))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS discovered_sensors (
mac TEXT PRIMARY KEY,
name TEXT,
rssi INTEGER,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
count INTEGER DEFAULT 0,
last_temp REAL,
last_humidity REAL,
last_battery_percent INTEGER,
last_battery_voltage INTEGER,
status TEXT DEFAULT 'pending',
reviewed BOOLEAN DEFAULT 0,
ignored_at TIMESTAMP,
ignore_reason TEXT
)
""")
now = datetime.now().isoformat()
cursor.execute("""
INSERT INTO discovered_sensors
(mac, name, rssi, first_seen, last_seen, count, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", ("AA:BB:CC:DD:EE:33", "Unknown Sensor", -80, now, now, 1, "pending"))
conn.commit()
conn.close()
# Load config and DM
config = SensorConfig(str(config_file))
dm = DiscoveryManager(str(db_file), config)
# Verify sensor is pending
pending = dm.get_pending()
assert len(pending) == 1
assert pending[0].mac == "AA:BB:CC:DD:EE:33"
# Approve and add to config (simulate TUI action)
config.add_sensor("AA:BB:CC:DD:EE:33", "Kitchen Sensor")
dm.approve("AA:BB:CC:DD:EE:33")
# Verify sensor is no longer pending (filtered by config)
pending = dm.get_pending()
assert len(pending) == 0
# Verify it's in config
assert config.sensors["AA:BB:CC:DD:EE:33"] == "Kitchen Sensor"