Implement sensor auto-discovery feature

New Features:
- Automatic discovery of unknown Bluetooth sensors
- Discovery manager tracks pending/approved/ignored sensors
- ntfy notifications when new sensors found (optional)
- Interactive CLI tool: sensorpajen-approve-sensors
- Automatic config reload every 15 minutes (no restart needed)

Files Added:
- src/sensorpajen/discovery_manager.py: Sensor discovery management
- src/sensorpajen/approve_sensors.py: Interactive approval CLI
- config/discovered_sensors.json.example: Example discovery file

Files Modified:
- src/sensorpajen/config.py: Added ntfy and discovery config
- src/sensorpajen/main.py: Added discovery manager and config reload
- src/sensorpajen/sensor_reader.py: Added discovery on unknown sensors
- config/sensorpajen.env.example: Added ntfy and reload settings
- pyproject.toml: Added approve-sensors CLI command

Configuration:
- NTFY_ENABLED, NTFY_URL, NTFY_TOPIC, NTFY_TOKEN
- DISCOVERED_SENSORS_FILE, CONFIG_RELOAD_INTERVAL
- Pre-filled comments with sensor metadata

See TASKS.md for complete feature specification.
This commit is contained in:
2025-12-27 15:03:19 +01:00
parent 9de5f82924
commit 9b1229a2ee
9 changed files with 738 additions and 74 deletions

View File

@@ -86,7 +86,7 @@ Implement **automatic sensor discovery** with a **user approval workflow** that:
* If `NTFY_ENABLED=false`, skip notifications * If `NTFY_ENABLED=false`, skip notifications
* If ntfy is unreachable, log error and continue * If ntfy is unreachable, log error and continue
* Discovery and approval must work even if ntfy fails * Discovery and approval must work even if ntfy fails
* The user must only be notified once per discovered sensor
--- ---
### 4. User Approval Script ### 4. User Approval Script

View File

@@ -0,0 +1,32 @@
[
{
"mac": "A4:C1:38:12:34:56",
"name": "ATC_123456",
"rssi": -65,
"first_seen": "2025-12-27T10:30:15",
"last_seen": "2025-12-27T10:35:42",
"sample_reading": {
"temperature": 21.5,
"humidity": 45,
"battery_percent": 87,
"battery_voltage": 2950
},
"status": "pending"
},
{
"mac": "A4:C1:38:AB:CD:EF",
"name": "ATC_ABCDEF",
"rssi": -72,
"first_seen": "2025-12-27T11:00:00",
"last_seen": "2025-12-27T11:10:00",
"sample_reading": {
"temperature": 19.8,
"humidity": 52,
"battery_percent": 65,
"battery_voltage": 2800
},
"status": "ignored",
"ignored_at": "2025-12-27T11:15:00",
"ignore_reason": "Test sensor, not needed"
}
]

View File

@@ -7,8 +7,16 @@ MQTT_CLIENT_ID=mibridge
# Sensor Configuration (relative to project root) # Sensor Configuration (relative to project root)
SENSOR_CONFIG_FILE=config/sensors.json SENSOR_CONFIG_FILE=config/sensors.json
DISCOVERED_SENSORS_FILE=config/discovered_sensors.json
# Application Settings # Application Settings
WATCHDOG_TIMEOUT=5 WATCHDOG_TIMEOUT=5
ENABLE_BATTERY=true ENABLE_BATTERY=true
LOG_LEVEL=INFO LOG_LEVEL=INFO
CONFIG_RELOAD_INTERVAL=900 # 15 minutes in seconds
# ntfy Notifications (optional)
NTFY_ENABLED=false
NTFY_URL=https://ntfy.sh
NTFY_TOPIC=sensorpajen
NTFY_TOKEN=

View File

@@ -44,6 +44,7 @@ Repository = "https://github.com/yourusername/sensorpajen"
[project.scripts] [project.scripts]
sensorpajen = "sensorpajen.main:main" sensorpajen = "sensorpajen.main:main"
sensorpajen-approve-sensors = "sensorpajen.approve_sensors:main"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]

View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
CLI tool for approving or ignoring discovered sensors.
Interactive tool to manage pending and ignored sensors.
"""
import sys
import json
import logging
from pathlib import Path
from typing import List
from . import config
from .discovery_manager import DiscoveryManager, DiscoveredSensor
logger = logging.getLogger(__name__)
def format_metadata_comment(sensor: DiscoveredSensor) -> str:
"""
Format sensor metadata as a comment string.
Args:
sensor: Discovered sensor
Returns:
Formatted comment string
"""
return (
f"MAC: {sensor.mac}, "
f"Name: {sensor.name}, "
f"Last seen: {sensor.last_seen}, "
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C, "
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%, "
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%"
)
def display_sensor(sensor: DiscoveredSensor, index: int, total: int):
"""
Display sensor information to the user.
Args:
sensor: Discovered sensor to display
index: Current sensor number (1-based)
total: Total number of sensors
"""
print(f"\n{'='*70}")
print(f"Sensor {index}/{total}")
print(f"{'='*70}")
print(f"MAC Address: {sensor.mac}")
print(f"Device Name: {sensor.name}")
print(f"Last Seen: {sensor.last_seen}")
print(f"Status: {sensor.status}")
if sensor.status == "ignored" and sensor.ignored_at:
print(f"Ignored At: {sensor.ignored_at}")
if sensor.ignore_reason:
print(f"Reason: {sensor.ignore_reason}")
# Display sample reading
reading = sensor.sample_reading
print(f"\nSample Reading:")
print(f" Temperature: {reading.get('temperature', 'N/A')}°C")
print(f" Humidity: {reading.get('humidity', 'N/A')}%")
print(f" Battery: {reading.get('battery_percent', 'N/A')}%")
print(f" Voltage: {reading.get('battery_voltage', 'N/A')}mV")
print(f"{'='*70}")
def get_user_choice() -> str:
"""
Get user's choice for what to do with the sensor.
Returns:
User choice: 'a' (approve), 'i' (ignore), 's' (skip)
"""
while True:
choice = input("\n[A]pprove, [I]gnore, [S]kip, [Q]uit? ").strip().lower()
if choice in ['a', 'i', 's', 'q']:
return choice
print("Invalid choice. Please enter A, I, S, or Q.")
def approve_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Approve a sensor and add it to sensors.json.
Args:
sensor: Sensor to approve
manager: Discovery manager
"""
# Check if sensor already exists in sensors.json
sensor_config_path = Path(config.SENSOR_CONFIG_FILE)
try:
with open(sensor_config_path, 'r') as f:
data = json.load(f)
# Check for duplicates
for existing_sensor in data.get('sensors', []):
if existing_sensor.get('mac', '').upper() == sensor.mac:
print(f"\n⚠️ Sensor {sensor.mac} already exists in sensors.json")
print(" Renaming must be done manually in the file.")
return
except FileNotFoundError:
# File doesn't exist yet, create with empty sensors list
data = {'sensors': []}
except json.JSONDecodeError as e:
print(f"\n❌ Error: Invalid JSON in {sensor_config_path}: {e}")
return
# Get sensor name from user
while True:
name = input("\nEnter sensor name (required): ").strip()
if name:
break
print("Sensor name cannot be empty.")
# Pre-fill comment with metadata
default_comment = format_metadata_comment(sensor)
print(f"\nDefault comment:")
print(f" {default_comment}")
edit = input("\nEdit comment? [y/N]: ").strip().lower()
if edit == 'y':
print("\nEnter comment (or press Enter to keep default):")
comment = input("> ").strip()
if not comment:
comment = default_comment
else:
comment = default_comment
# Add to sensors.json
new_sensor = {
"mac": sensor.mac,
"name": name
}
if comment:
new_sensor["comment"] = comment
data.setdefault('sensors', []).append(new_sensor)
try:
with open(sensor_config_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"\n✅ Sensor approved and added to sensors.json")
print(f" Name: {name}")
print(f" Configuration will be reloaded automatically within 15 minutes")
# Mark as approved in discovery manager
manager.approve(sensor.mac)
except Exception as e:
print(f"\n❌ Error saving to sensors.json: {e}")
def ignore_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Ignore a sensor.
Args:
sensor: Sensor to ignore
manager: Discovery manager
"""
reason = input("\nReason for ignoring (optional): ").strip()
manager.ignore(sensor.mac, reason if reason else None)
print(f"\n✅ Sensor ignored")
if reason:
print(f" Reason: {reason}")
def process_sensors(sensors: List[DiscoveredSensor], manager: DiscoveryManager):
"""
Process list of sensors interactively.
Args:
sensors: List of sensors to process
manager: Discovery manager
"""
if not sensors:
print("\n✅ No sensors to process")
return
print(f"\nFound {len(sensors)} sensor(s) to review")
for i, sensor in enumerate(sensors, 1):
display_sensor(sensor, i, len(sensors))
choice = get_user_choice()
if choice == 'q':
print("\n👋 Exiting...")
break
elif choice == 'a':
approve_sensor(sensor, manager)
elif choice == 'i':
ignore_sensor(sensor, manager)
elif choice == 's':
print("\n⏭️ Skipped")
continue
def main():
"""Main entry point for approve-sensors CLI."""
# Setup logging
logging.basicConfig(
level=logging.WARNING,
format='%(levelname)s: %(message)s'
)
print("=" * 70)
print("Sensorpajen - Approve Sensors")
print("=" * 70)
try:
# Load discovery manager
manager = DiscoveryManager()
# Get pending and ignored sensors
pending = manager.get_pending()
ignored = manager.get_ignored()
if not pending and not ignored:
print("\n✅ No pending or ignored sensors found")
print("\nDiscovered sensors will appear here when detected.")
return 0
# Process pending sensors first
if pending:
print(f"\n📋 Processing {len(pending)} pending sensor(s)...")
process_sensors(pending, manager)
# Ask about ignored sensors
if ignored:
print(f"\n\nThere are {len(ignored)} ignored sensor(s).")
review = input("Review ignored sensors? [y/N]: ").strip().lower()
if review == 'y':
process_sensors(ignored, manager)
print("\n" + "=" * 70)
print("Done!")
print("=" * 70)
return 0
except KeyboardInterrupt:
print("\n\n👋 Interrupted by user")
return 1
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -46,6 +46,19 @@ LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
SKIP_IDENTICAL = int(os.environ.get("SKIP_IDENTICAL", "50")) SKIP_IDENTICAL = int(os.environ.get("SKIP_IDENTICAL", "50"))
DEBOUNCE = os.environ.get("DEBOUNCE", "true").lower() == "true" DEBOUNCE = os.environ.get("DEBOUNCE", "true").lower() == "true"
# ntfy notification settings (optional)
NTFY_ENABLED = os.environ.get("NTFY_ENABLED", "false").lower() == "true"
NTFY_URL = os.environ.get("NTFY_URL", "https://ntfy.sh")
NTFY_TOPIC = os.environ.get("NTFY_TOPIC", "sensorpajen")
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
# Discovery settings
DISCOVERED_SENSORS_FILE = os.environ.get(
"DISCOVERED_SENSORS_FILE",
str(PROJECT_ROOT / "config/discovered_sensors.json")
)
CONFIG_RELOAD_INTERVAL = int(os.environ.get("CONFIG_RELOAD_INTERVAL", "900")) # 15 minutes
class SensorConfig: class SensorConfig:
"""Manages sensor configuration from JSON file.""" """Manages sensor configuration from JSON file."""
@@ -118,7 +131,12 @@ def validate_config():
logger.info(f"MQTT Client ID: {MQTT_CLIENT_ID}") logger.info(f"MQTT Client ID: {MQTT_CLIENT_ID}")
logger.info(f"MQTT Topic Prefix: {MQTT_TOPIC_PREFIX}") logger.info(f"MQTT Topic Prefix: {MQTT_TOPIC_PREFIX}")
logger.info(f"Sensor Config: {SENSOR_CONFIG_FILE}") logger.info(f"Sensor Config: {SENSOR_CONFIG_FILE}")
logger.info(f"Discovered Sensors: {DISCOVERED_SENSORS_FILE}")
logger.info(f"Watchdog Timeout: {WATCHDOG_TIMEOUT}s") logger.info(f"Watchdog Timeout: {WATCHDOG_TIMEOUT}s")
logger.info(f"Battery Monitoring: {ENABLE_BATTERY}") logger.info(f"Battery Monitoring: {ENABLE_BATTERY}")
logger.info(f"Config Reload Interval: {CONFIG_RELOAD_INTERVAL}s")
logger.info(f"ntfy Enabled: {NTFY_ENABLED}")
if NTFY_ENABLED:
logger.info(f"ntfy URL: {NTFY_URL}/{NTFY_TOPIC}")
logger.info(f"Log Level: {LOG_LEVEL}") logger.info(f"Log Level: {LOG_LEVEL}")
logger.info("================================") logger.info("================================")

View File

@@ -0,0 +1,246 @@
"""
Discovery manager for tracking and managing discovered sensors.
Maintains a database of discovered sensors with their metadata and status.
"""
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from . import config
logger = logging.getLogger(__name__)
@dataclass
class DiscoveredSensor:
"""Represents a discovered sensor with metadata."""
mac: str
name: str
rssi: int
first_seen: str
last_seen: str
sample_reading: Dict[str, float]
status: str = "pending" # pending, approved, ignored
ignored_at: Optional[str] = None
ignore_reason: Optional[str] = None
class DiscoveryManager:
"""Manages discovered sensors and their approval status."""
def __init__(self, discovery_file: str = config.DISCOVERED_SENSORS_FILE):
"""
Initialize discovery manager.
Args:
discovery_file: Path to discovered sensors JSON file
"""
self.discovery_file = Path(discovery_file)
self.sensors: Dict[str, DiscoveredSensor] = {}
self.load()
def load(self):
"""Load discovered sensors from JSON file."""
if not self.discovery_file.exists():
logger.info(f"Creating new discovered sensors file: {self.discovery_file}")
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
self.save()
return
try:
with open(self.discovery_file, 'r') as f:
data = json.load(f)
for sensor_data in data:
sensor = DiscoveredSensor(**sensor_data)
self.sensors[sensor.mac.upper()] = sensor
logger.info(f"Loaded {len(self.sensors)} discovered sensors")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in {self.discovery_file}: {e}")
except Exception as e:
logger.error(f"Error loading discovered sensors: {e}")
def save(self):
"""Save discovered sensors to JSON file."""
try:
# Ensure directory exists
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
# Convert sensors to list of dicts
data = [asdict(sensor) for sensor in self.sensors.values()]
with open(self.discovery_file, 'w') as f:
json.dump(data, f, indent=2)
logger.debug(f"Saved {len(self.sensors)} discovered sensors")
except Exception as e:
logger.error(f"Error saving discovered sensors: {e}")
def add_or_update(self, mac: str, name: str, rssi: int,
temperature: float, humidity: float,
battery_percent: int, battery_voltage: int) -> bool:
"""
Add or update a discovered sensor.
Args:
mac: MAC address
name: Advertised device name
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
Returns:
True if this is a newly discovered sensor, False if updated existing
"""
mac = mac.upper()
now = datetime.now().isoformat()
sample_reading = {
"temperature": temperature,
"humidity": humidity,
"battery_percent": battery_percent,
"battery_voltage": battery_voltage
}
if mac in self.sensors:
# Update existing sensor
sensor = self.sensors[mac]
sensor.last_seen = now
sensor.rssi = rssi
sensor.sample_reading = sample_reading
self.save()
return False
else:
# New sensor discovered
sensor = DiscoveredSensor(
mac=mac,
name=name,
rssi=rssi,
first_seen=now,
last_seen=now,
sample_reading=sample_reading,
status="pending"
)
self.sensors[mac] = sensor
self.save()
logger.info(f"New sensor discovered: {mac} ({name})")
return True
def is_known(self, mac: str) -> bool:
"""
Check if a sensor has been discovered before.
Args:
mac: MAC address
Returns:
True if sensor is in discovered list
"""
return mac.upper() in self.sensors
def get_status(self, mac: str) -> Optional[str]:
"""
Get status of a discovered sensor.
Args:
mac: MAC address
Returns:
Status string or None if not found
"""
sensor = self.sensors.get(mac.upper())
return sensor.status if sensor else None
def approve(self, mac: str):
"""
Mark a sensor as approved.
Args:
mac: MAC address
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "approved"
self.save()
logger.info(f"Sensor approved: {mac}")
def ignore(self, mac: str, reason: Optional[str] = None):
"""
Mark a sensor as ignored.
Args:
mac: MAC address
reason: Optional reason for ignoring
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "ignored"
self.sensors[mac].ignored_at = datetime.now().isoformat()
self.sensors[mac].ignore_reason = reason
self.save()
logger.info(f"Sensor ignored: {mac}")
def get_pending(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'pending'."""
return [s for s in self.sensors.values() if s.status == "pending"]
def get_ignored(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'ignored'."""
return [s for s in self.sensors.values() if s.status == "ignored"]
def send_ntfy_notification(self, sensor: DiscoveredSensor):
"""
Send ntfy notification for a newly discovered sensor.
Args:
sensor: Discovered sensor to notify about
"""
if not config.NTFY_ENABLED:
logger.debug("ntfy notifications disabled")
return
if not config.NTFY_TOKEN:
logger.warning("ntfy enabled but NTFY_TOKEN not set")
return
try:
message = (
f"🆕 New sensor discovered!\n\n"
f"MAC: {sensor.mac}\n"
f"Name: {sensor.name}\n"
f"Last seen: {sensor.last_seen}\n"
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C\n"
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%\n"
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%\n\n"
f"Run 'sensorpajen approve-sensors' to approve or ignore."
)
url = f"{config.NTFY_URL}/{config.NTFY_TOPIC}"
result = subprocess.run(
["curl", "-H", f"Authorization: Bearer {config.NTFY_TOKEN}",
"-d", message, url],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Sent ntfy notification for {sensor.mac}")
else:
logger.warning(f"ntfy notification failed: {result.stderr.decode()}")
except subprocess.TimeoutExpired:
logger.warning("ntfy notification timed out")
except Exception as e:
logger.error(f"Error sending ntfy notification: {e}")

View File

@@ -10,12 +10,14 @@ import sys
import signal import signal
import logging import logging
import time import time
import threading
from pathlib import Path from pathlib import Path
from . import __version__ from . import __version__
from . import config from . import config
from .mqtt_publisher import MQTTPublisher from .mqtt_publisher import MQTTPublisher
from .sensor_reader import SensorReader, Measurement from .sensor_reader import SensorReader, Measurement
from .discovery_manager import DiscoveryManager
class Sensorpajen: class Sensorpajen:
@@ -26,7 +28,9 @@ class Sensorpajen:
self.mqtt_publisher: MQTTPublisher = None self.mqtt_publisher: MQTTPublisher = None
self.sensor_reader: SensorReader = None self.sensor_reader: SensorReader = None
self.sensor_config: config.SensorConfig = None self.sensor_config: config.SensorConfig = None
self.discovery_manager: DiscoveryManager = None
self.running = False self.running = False
self.config_reload_timer: threading.Timer = None
# Setup logging # Setup logging
self._setup_logging() self._setup_logging()
@@ -74,6 +78,39 @@ class Sensorpajen:
except Exception as e: except Exception as e:
self.logger.error(f"Error handling measurement: {e}") self.logger.error(f"Error handling measurement: {e}")
def _reload_config(self):
"""Reload sensor configuration periodically."""
if not self.running:
return
try:
self.logger.info("Reloading sensor configuration...")
old_sensors = set(self.sensor_config.sensors.keys())
self.sensor_config.load()
new_sensors = set(self.sensor_config.sensors.keys())
added = new_sensors - old_sensors
removed = old_sensors - new_sensors
if added:
self.logger.info(f"Added sensors: {', '.join(added)}")
if removed:
self.logger.info(f"Removed sensors: {', '.join(removed)}")
if not added and not removed:
self.logger.debug("No sensor configuration changes")
except Exception as e:
self.logger.error(f"Error reloading configuration: {e}")
finally:
# Schedule next reload
if self.running:
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
def start(self): def start(self):
"""Start the application.""" """Start the application."""
try: try:
@@ -92,6 +129,10 @@ class Sensorpajen:
self.logger.error("Please configure sensors in config/sensors.json") self.logger.error("Please configure sensors in config/sensors.json")
sys.exit(1) sys.exit(1)
# Initialize discovery manager
self.logger.info("Initializing discovery manager...")
self.discovery_manager = DiscoveryManager()
# Initialize MQTT publisher # Initialize MQTT publisher
self.logger.info("Initializing MQTT publisher...") self.logger.info("Initializing MQTT publisher...")
self.mqtt_publisher = MQTTPublisher() self.mqtt_publisher = MQTTPublisher()
@@ -107,10 +148,20 @@ class Sensorpajen:
self.logger.info("Initializing Bluetooth sensor reader...") self.logger.info("Initializing Bluetooth sensor reader...")
self.sensor_reader = SensorReader( self.sensor_reader = SensorReader(
sensor_config=self.sensor_config, sensor_config=self.sensor_config,
discovery_manager=self.discovery_manager,
on_measurement=self._on_measurement, on_measurement=self._on_measurement,
interface=0 # hci0 interface=0 # hci0
) )
# Start config reload timer
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
self.logger.info(f"Config reload scheduled every {config.CONFIG_RELOAD_INTERVAL}s")
# Start reading sensors (blocking call) # Start reading sensors (blocking call)
self.logger.info("=" * 50) self.logger.info("=" * 50)
self.logger.info("Sensorpajen is now running") self.logger.info("Sensorpajen is now running")
@@ -141,6 +192,13 @@ class Sensorpajen:
self.running = False self.running = False
self.logger.info("Shutting down...") self.logger.info("Shutting down...")
# Cancel config reload timer
if self.config_reload_timer:
try:
self.config_reload_timer.cancel()
except Exception as e:
self.logger.error(f"Error canceling reload timer: {e}")
# Stop sensor reader # Stop sensor reader
if self.sensor_reader: if self.sensor_reader:
try: try:

View File

@@ -33,7 +33,8 @@ class Measurement:
class SensorReader: class SensorReader:
"""Reads Xiaomi LYWSD03MMC sensors with ATC firmware via BLE.""" """Reads Xiaomi LYWSD03MMC sensors with ATC firmware via BLE."""
def __init__(self, sensor_config: config.SensorConfig, def __init__(self, sensor_config: config.SensorConfig,
discovery_manager,
on_measurement: Callable[[Measurement], None], on_measurement: Callable[[Measurement], None],
interface: int = 0): interface: int = 0):
""" """
@@ -41,10 +42,12 @@ class SensorReader:
Args: Args:
sensor_config: Sensor configuration mapping sensor_config: Sensor configuration mapping
discovery_manager: Discovery manager for tracking new sensors
on_measurement: Callback function for new measurements on_measurement: Callback function for new measurements
interface: Bluetooth interface number (default 0 for hci0) interface: Bluetooth interface number (default 0 for hci0)
""" """
self.sensor_config = sensor_config self.sensor_config = sensor_config
self.discovery_manager = discovery_manager
self.on_measurement = on_measurement self.on_measurement = on_measurement
self.interface = interface self.interface = interface
self.sock: Optional[int] = None self.sock: Optional[int] = None
@@ -162,93 +165,128 @@ class SensorReader:
if packet_mac != mac_str: if packet_mac != mac_str:
return # MAC mismatch return # MAC mismatch
# Check if this is a known sensor or if we accept all
mac_with_colons = mac.upper() mac_with_colons = mac.upper()
if mac_with_colons not in self.sensor_config.sensors:
logger.debug(f"Ignoring unknown sensor: {mac}")
return
# Check advertisement number to avoid duplicates # Parse ATC data packet first to get sensor data
adv_number = data_str[-2:]
if mac_str in self.adv_counter:
if self.adv_counter[mac_str] == adv_number:
return # Duplicate packet
self.adv_counter[mac_str] = adv_number
# Parse ATC data packet
try: try:
measurement = self._parse_atc_packet(data_str, mac_with_colons, rssi) parsed_data = self._parse_atc_data(data_str)
if not parsed_data:
if measurement: return
# Log the measurement
logger.info(
f"{measurement.sensor_name}: {measurement.temperature}°C, "
f"{measurement.humidity}%, {measurement.voltage}V, "
f"battery {measurement.battery}%, RSSI {rssi}dBm"
)
# Call measurement callback temperature, humidity, battery_percent, battery_voltage, adv_number = parsed_data
if self.on_measurement:
self.on_measurement(measurement)
except Exception as e:
logger.error(f"Error parsing ATC packet from {mac}: {e}")
def _parse_atc_packet(self, data_str: str, mac: str, rssi: int) -> Optional[Measurement]:
"""
Parse ATC advertisement data packet.
ATC packet format (after service UUID):
- Bytes 10-22: MAC address
- Bytes 22-26: Temperature (signed int16, big endian, /10 for °C)
- Bytes 26-28: Humidity (uint8, %)
- Bytes 28-30: Battery (uint8, %)
- Bytes 30-34: Battery voltage (uint16, big endian, /1000 for V)
- Bytes 34-36: Frame counter
Args:
data_str: Hex string of advertisement data
mac: MAC address with colons
rssi: Signal strength
Returns: # Check if this is a known sensor
Measurement object or None if parsing failed if mac_with_colons not in self.sensor_config.sensors:
""" # Unknown sensor - check if we should discover it
try: self._handle_unknown_sensor(
# Extract temperature (signed, big endian) mac_with_colons,
temp_hex = data_str[22:26] rssi,
temperature = int.from_bytes( temperature,
bytearray.fromhex(temp_hex), humidity,
byteorder='big', battery_percent,
signed=True battery_voltage
) / 10.0 )
return
# Extract humidity # Check advertisement number to avoid duplicates
humidity = int(data_str[26:28], 16) if mac_str in self.adv_counter:
if self.adv_counter[mac_str] == adv_number:
return # Duplicate packet
self.adv_counter[mac_str] = adv_number
# Extract battery level # Create measurement for known sensor
battery = int(data_str[28:30], 16) sensor_name = self.sensor_config.get_name(mac_with_colons)
# Extract battery voltage
voltage_hex = data_str[30:34]
voltage = int(voltage_hex, 16) / 1000.0
# Get sensor name from config
sensor_name = self.sensor_config.get_name(mac)
# Create measurement
measurement = Measurement( measurement = Measurement(
temperature=temperature, temperature=temperature,
humidity=humidity, humidity=humidity,
voltage=voltage, voltage=battery_voltage / 1000.0,
battery=battery, battery=battery_percent,
rssi=rssi, rssi=rssi,
sensor_name=sensor_name, sensor_name=sensor_name,
timestamp=int(time.time()) timestamp=int(time.time())
) )
return measurement # Log the measurement
logger.info(
f"{measurement.sensor_name}: {measurement.temperature}°C, "
f"{measurement.humidity}%, {measurement.voltage}V, "
f"battery {measurement.battery}%, RSSI {rssi}dBm"
)
# Call measurement callback
if self.on_measurement:
self.on_measurement(measurement)
except Exception as e: except Exception as e:
logger.error(f"Error parsing ATC data: {e}") logger.error(f"Error parsing ATC packet from {mac}: {e}")
def _handle_unknown_sensor(self, mac: str, rssi: int, temperature: float,
humidity: int, battery_percent: int, battery_voltage: int):
"""
Handle discovery of unknown sensor.
Args:
mac: MAC address with colons
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
"""
# Get or construct device name from MAC
# ATC sensors advertise as ATC_XXXXXX where XXXXXX is last 3 bytes
mac_suffix = mac.replace(":", "")[-6:]
device_name = f"ATC_{mac_suffix}"
# Check if already discovered
if self.discovery_manager.is_known(mac):
# Just update the discovery record
self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
return
# New sensor - discover and notify
is_new = self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
if is_new:
logger.info(f"New sensor discovered: {mac} ({device_name})")
sensor = self.discovery_manager.sensors[mac]
self.discovery_manager.send_ntfy_notification(sensor)
def _parse_atc_data(self, data_str: str) -> Optional[tuple]:
"""
Parse ATC advertisement data.
Returns:
Tuple of (temperature, humidity, battery_percent, battery_voltage, adv_number) or None
"""
try:
# Temperature: bytes 22-26, signed int16, big endian, /10
temp_hex = data_str[22:26]
temp_raw = int(temp_hex, 16)
if temp_raw & 0x8000: # Check sign bit
temp_raw = temp_raw - 0x10000
temperature = temp_raw / 10.0
# Humidity: bytes 26-28, uint8
humidity = int(data_str[26:28], 16)
# Battery: bytes 28-30, uint8
battery_percent = int(data_str[28:30], 16)
# Battery voltage: bytes 30-34, uint16, big endian, mV
battery_voltage = int(data_str[30:34], 16)
# Advertisement number: last 2 bytes
adv_number = data_str[-2:]
return (temperature, humidity, battery_percent, battery_voltage, adv_number)
except (ValueError, IndexError) as e:
logger.debug(f"Error parsing ATC data: {e}")
return None return None