Phase 2 Complete: Python Package Structure

Core modules created:
- config.py: Environment-based configuration management
  - Loads MQTT settings from environment variables
  - SensorConfig class for JSON sensor mapping
  - Relative path resolution (PROJECT_ROOT)
  - Configuration validation with fail-fast

- mqtt_publisher.py: MQTT client wrapper
  - MQTTPublisher class with connection management
  - Replaces sendToMQTT.sh shell script
  - Direct Python MQTT publishing
  - Automatic reconnection support
  - Optional battery data publishing

- sensor_reader.py: Bluetooth BLE sensor reader
  - SensorReader class for passive BLE scanning
  - ATC firmware packet parsing
  - Duplicate packet filtering via advertisement counter
  - Watchdog thread for BLE recovery
  - Measurement dataclass for type safety

- utils.py: Bluetooth utilities
  - Ported from bluetooth_utils.py (MIT, Colin GUYON)
  - BLE scanning and advertisement parsing functions
  - Linux HCI socket operations

- main.py: Application entry point
  - Sensorpajen main application class
  - Signal handling (SIGTERM/SIGINT) for graceful shutdown
  - Logging to stdout for journald integration
  - Coordinates all components

Architecture:
- Direct Python integration (no shell scripts)
- Clean separation of concerns
- Type hints and dataclasses
- Comprehensive logging
- Graceful shutdown handling

Updated ROADMAP.md to mark Phase 2 as complete.

Next: Phase 3 - Configuration Migration (mostly done in Phase 1)
This commit is contained in:
2025-12-27 13:17:26 +01:00
parent 426f1d3813
commit c9b68dd8e2
6 changed files with 1122 additions and 89 deletions

View File

@@ -122,98 +122,42 @@ Using relative paths for portability across systems:
--- ---
### Phase 2: Python Package Structure ✓ TODO ### Phase 2: Python Package Structure ✅ DONE (2025-12-27)
**Goal**: Create modern Python package with proper entry point **Goal**: Create modern Python package with proper entry point
**Notes**:
- Used src/ layout for better packaging practices
- Direct Python MQTT integration (no shell script callbacks)
- ATC firmware BLE advertisement reading (passive scanning)
- Watchdog thread for BLE connection recovery
- Clean separation of concerns (config, MQTT, sensors, main)
#### Tasks: #### Tasks:
1. Create `src/sensorpajen/__init__.py` - ✅ Created src/sensorpajen/__init__.py with version info
- Package initialization - ✅ Created src/sensorpajen/config.py
- Version information - Environment variable loading with validation
- SensorConfig class for JSON sensor mapping
2. Create `src/sensorpajen/config.py` - Relative path resolution (PROJECT_ROOT)
- Environment variable loading - Configuration validation and logging
- Configuration validation - ✅ Created src/sensorpajen/utils.py
- Default values - Ported bluetooth_utils.py (MIT licensed, Colin GUYON)
- Fail-fast on missing required config - BLE scanning and advertisement parsing
```python - ✅ Created src/sensorpajen/mqtt_publisher.py
import os - MQTTPublisher class with connection management
import json - Direct publishing (replaces sendToMQTT.sh)
from pathlib import Path - Automatic reconnection support
- Battery data publishing (optional)
# MQTT Configuration from environment - ✅ Created src/sensorpajen/sensor_reader.py
MQTT_HOST = os.environ.get("MQTT_HOST") - SensorReader class for BLE scanning
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883")) - ATC packet parsing
MQTT_USER = os.environ.get("MQTT_USER") - Duplicate packet filtering
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD") - Watchdog for BLE recovery
MQTT_CLIENT_ID = os.environ.get("MQTT_CLIENT_ID", "sensorpajen") - Measurement dataclass
- ✅ Created src/sensorpajen/main.py
# Validate required config - Application entry point
if not MQTT_HOST:
raise RuntimeError("MQTT_HOST environment variable must be set")
(relative to project root)
PROJECT_ROOT = Path(__file__).parent.parent.parent
SENSOR_CONFIG_FILE = os.environ.get(
"SENSOR_CONFIG_FILE",
str(PROJECT_ROOT / "config
str(Path.home() / ".config/sensorpajen/sensors.json")
)
# Bluetooth settings
WATCHDOG_TIMEOUT = int(os.environ.get("WATCHDOG_TIMEOUT", "5"))
ENABLE_BATTERY = os.environ.get("ENABLE_BATTERY", "true").lower() == "true"
```
3. Create `src/sensorpajen/utils.py`
- Port bluetooth_utils.py functionality
- Clean up and modernize
4. Create `src/sensorpajen/sensor_reader.py`
- Extract sensor reading logic from LYWSD03MMC.py
- Remove callback/shell script dependency
- Direct Python MQTT integration
5. Create `src/sensorpajen/mqtt_publisher.py`
- MQTT client setup and connection
- Publishing logic (replacing sendToMQTT.sh)
- Error handling and reconnection
6. Create `src/sensorpajen/main.py`
- Entry point for the application
- Signal handling (SIGTERM, SIGINT) - Signal handling (SIGTERM, SIGINT)
- Logging setup (to stdout for journald) - Graceful shutdown
- Main loop - Logging to stdout for journald
```python
#!/usr/bin/env python3
import logging
import signal
import sys
from . import config
from .sensor_reader import SensorReader
from .mqtt_publisher import MQTTPublisher
def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
logger = logging.getLogger(__name__)
logger.info("Starting sensorpajen service")
# Setup signal handlers
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Main application logic here
# ...
if __name__ == "__main__":
main()
```
--- ---

124
src/sensorpajen/config.py Normal file
View File

@@ -0,0 +1,124 @@
"""
Configuration management for Sensorpajen.
Loads configuration from environment variables with sensible defaults.
Configuration files are loaded relative to the project root.
"""
import os
import json
import logging
from pathlib import Path
from typing import Dict, List
logger = logging.getLogger(__name__)
# Determine project root (3 levels up from this file: src/sensorpajen/config.py)
PROJECT_ROOT = Path(__file__).parent.parent.parent
# MQTT Configuration from environment
MQTT_HOST = os.environ.get("MQTT_HOST")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD")
MQTT_CLIENT_ID = os.environ.get("MQTT_CLIENT_ID", "sensorpajen")
MQTT_TOPIC_PREFIX = os.environ.get("MQTT_TOPIC_PREFIX", "MiTemperature2")
# Validate required MQTT configuration
if not MQTT_HOST:
raise RuntimeError(
"MQTT_HOST environment variable must be set. "
"Please configure config/sensorpajen.env"
)
# Sensor configuration file (relative to project root)
SENSOR_CONFIG_FILE = os.environ.get(
"SENSOR_CONFIG_FILE",
str(PROJECT_ROOT / "config/sensors.json")
)
# Application settings
WATCHDOG_TIMEOUT = int(os.environ.get("WATCHDOG_TIMEOUT", "5"))
ENABLE_BATTERY = os.environ.get("ENABLE_BATTERY", "true").lower() == "true"
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
# Bluetooth settings
SKIP_IDENTICAL = int(os.environ.get("SKIP_IDENTICAL", "50"))
DEBOUNCE = os.environ.get("DEBOUNCE", "true").lower() == "true"
class SensorConfig:
"""Manages sensor configuration from JSON file."""
def __init__(self, config_file: str = SENSOR_CONFIG_FILE):
"""
Initialize sensor configuration.
Args:
config_file: Path to sensors JSON configuration file
"""
self.config_file = Path(config_file)
self.sensors: Dict[str, str] = {}
self.load()
def load(self):
"""Load sensor configuration from JSON file."""
if not self.config_file.exists():
raise FileNotFoundError(
f"Sensor configuration file not found: {self.config_file}\n"
f"Please copy config/sensors.json.example to config/sensors.json "
f"and configure your sensors."
)
try:
with open(self.config_file, 'r') as f:
data = json.load(f)
# Convert sensors list to MAC -> name mapping
for sensor in data.get('sensors', []):
mac = sensor.get('mac', '').upper()
name = sensor.get('name')
if mac and name:
self.sensors[mac] = name
logger.debug(f"Loaded sensor: {mac} -> {name}")
logger.info(f"Loaded {len(self.sensors)} sensors from {self.config_file}")
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in {self.config_file}: {e}")
except Exception as e:
raise RuntimeError(f"Error loading sensor config: {e}")
def get_name(self, mac: str) -> str:
"""
Get sensor name by MAC address.
Args:
mac: MAC address (any case)
Returns:
Sensor name or the MAC address if not found
"""
return self.sensors.get(mac.upper(), mac)
def get_all_macs(self) -> List[str]:
"""Get list of all configured MAC addresses."""
return list(self.sensors.keys())
def validate_config():
"""
Validate configuration and log settings.
Should be called at application startup.
"""
logger.info("=== Sensorpajen Configuration ===")
logger.info(f"MQTT Host: {MQTT_HOST}:{MQTT_PORT}")
logger.info(f"MQTT User: {MQTT_USER}")
logger.info(f"MQTT Client ID: {MQTT_CLIENT_ID}")
logger.info(f"MQTT Topic Prefix: {MQTT_TOPIC_PREFIX}")
logger.info(f"Sensor Config: {SENSOR_CONFIG_FILE}")
logger.info(f"Watchdog Timeout: {WATCHDOG_TIMEOUT}s")
logger.info(f"Battery Monitoring: {ENABLE_BATTERY}")
logger.info(f"Log Level: {LOG_LEVEL}")
logger.info("================================")

168
src/sensorpajen/main.py Normal file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""
Sensorpajen - Main entry point
Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC sensors.
Publishes sensor data to MQTT broker.
"""
import sys
import signal
import logging
import time
from pathlib import Path
from . import __version__
from . import config
from .mqtt_publisher import MQTTPublisher
from .sensor_reader import SensorReader, Measurement
class Sensorpajen:
"""Main application class."""
def __init__(self):
"""Initialize the application."""
self.mqtt_publisher: MQTTPublisher = None
self.sensor_reader: SensorReader = None
self.sensor_config: config.SensorConfig = None
self.running = False
# Setup logging
self._setup_logging()
# Setup signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _setup_logging(self):
"""Configure logging to stdout for journald."""
log_level = getattr(logging, config.LOG_LEVEL, logging.INFO)
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
# Set our logger
self.logger = logging.getLogger(__name__)
def _signal_handler(self, sig, frame):
"""Handle shutdown signals."""
signal_name = "SIGTERM" if sig == signal.SIGTERM else "SIGINT"
self.logger.info(f"Received {signal_name}, shutting down gracefully...")
self.shutdown()
sys.exit(0)
def _on_measurement(self, measurement: Measurement):
"""
Callback for new sensor measurements.
Args:
measurement: Sensor measurement data
"""
try:
# Publish to MQTT
self.mqtt_publisher.publish_measurement(
sensor_name=measurement.sensor_name,
temperature=measurement.temperature,
humidity=measurement.humidity,
battery_voltage=measurement.voltage,
battery_level=measurement.battery
)
except Exception as e:
self.logger.error(f"Error handling measurement: {e}")
def start(self):
"""Start the application."""
try:
self.logger.info("=" * 50)
self.logger.info(f"Starting Sensorpajen v{__version__}")
self.logger.info("=" * 50)
# Validate and log configuration
config.validate_config()
# Load sensor configuration
self.sensor_config = config.SensorConfig()
if len(self.sensor_config.sensors) == 0:
self.logger.error("No sensors configured!")
self.logger.error("Please configure sensors in config/sensors.json")
sys.exit(1)
# Initialize MQTT publisher
self.logger.info("Initializing MQTT publisher...")
self.mqtt_publisher = MQTTPublisher()
self.mqtt_publisher.connect()
# Wait a moment for MQTT connection
time.sleep(1)
if not self.mqtt_publisher.is_connected():
self.logger.warning("MQTT connection not established yet, continuing anyway...")
# Initialize sensor reader
self.logger.info("Initializing Bluetooth sensor reader...")
self.sensor_reader = SensorReader(
sensor_config=self.sensor_config,
on_measurement=self._on_measurement,
interface=0 # hci0
)
# Start reading sensors (blocking call)
self.logger.info("=" * 50)
self.logger.info("Sensorpajen is now running")
self.logger.info("Monitoring sensors via Bluetooth...")
self.logger.info("Publishing to MQTT...")
self.logger.info("Press Ctrl+C to stop")
self.logger.info("=" * 50)
self.running = True
self.sensor_reader.start()
except FileNotFoundError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except RuntimeError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except Exception as e:
self.logger.error(f"Failed to start application: {e}", exc_info=True)
self.shutdown()
sys.exit(1)
def shutdown(self):
"""Shutdown the application gracefully."""
if not self.running:
return
self.running = False
self.logger.info("Shutting down...")
# Stop sensor reader
if self.sensor_reader:
try:
self.sensor_reader.stop()
except Exception as e:
self.logger.error(f"Error stopping sensor reader: {e}")
# Disconnect MQTT
if self.mqtt_publisher:
try:
self.mqtt_publisher.disconnect()
except Exception as e:
self.logger.error(f"Error disconnecting MQTT: {e}")
self.logger.info("Shutdown complete")
def main():
"""Main entry point."""
app = Sensorpajen()
app.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,122 @@
"""
MQTT Publisher for sensor data.
Handles connection to MQTT broker and publishing of sensor measurements.
"""
import logging
import paho.mqtt.client as mqtt
from typing import Optional
from . import config
logger = logging.getLogger(__name__)
class MQTTPublisher:
"""Manages MQTT connection and publishing of sensor data."""
def __init__(self):
"""Initialize MQTT publisher with configuration."""
self.client: Optional[mqtt.Client] = None
self.connected = False
self._setup_client()
def _setup_client(self):
"""Setup MQTT client with callbacks."""
self.client = mqtt.Client(config.MQTT_CLIENT_ID)
# Set credentials if provided
if config.MQTT_USER and config.MQTT_PASSWORD:
self.client.username_pw_set(config.MQTT_USER, config.MQTT_PASSWORD)
# Setup callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
logger.info(f"MQTT client configured for {config.MQTT_HOST}:{config.MQTT_PORT}")
def _on_connect(self, client, userdata, flags, rc):
"""Callback for when client connects to broker."""
if rc == 0:
self.connected = True
logger.info(f"Connected to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
else:
self.connected = False
logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Callback for when client disconnects from broker."""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
else:
logger.info("Disconnected from MQTT broker")
def _on_publish(self, client, userdata, mid):
"""Callback for when message is published."""
logger.debug(f"Message published: {mid}")
def connect(self):
"""Connect to MQTT broker."""
try:
logger.info(f"Connecting to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
self.client.connect(config.MQTT_HOST, config.MQTT_PORT, keepalive=60)
self.client.loop_start() # Start network loop in background thread
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
def disconnect(self):
"""Disconnect from MQTT broker."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
def publish_measurement(self, sensor_name: str, temperature: float,
humidity: int, battery_voltage: float = None,
battery_level: int = None):
"""
Publish sensor measurement to MQTT.
Args:
sensor_name: Name of the sensor
temperature: Temperature in Celsius
humidity: Humidity percentage
battery_voltage: Battery voltage (optional)
battery_level: Battery level percentage (optional)
"""
if not self.connected:
logger.warning("Not connected to MQTT broker, skipping publish")
return
topic_prefix = f"{config.MQTT_TOPIC_PREFIX}/{sensor_name}"
try:
# Publish temperature
self.client.publish(f"{topic_prefix}/temp", f"{temperature:.1f}")
logger.debug(f"{sensor_name}: temp={temperature:.1f}°C")
# Publish humidity
self.client.publish(f"{topic_prefix}/humidity", f"{humidity}")
logger.debug(f"{sensor_name}: humidity={humidity}%")
# Publish battery info if enabled and available
if config.ENABLE_BATTERY:
if battery_voltage is not None:
self.client.publish(f"{topic_prefix}/batteryvoltage", f"{battery_voltage:.3f}")
logger.debug(f"{sensor_name}: battery_voltage={battery_voltage:.3f}V")
if battery_level is not None:
self.client.publish(f"{topic_prefix}/batterylevel", f"{battery_level}")
logger.debug(f"{sensor_name}: battery_level={battery_level}%")
logger.info(f"Published: {sensor_name} - {temperature:.1f}°C, {humidity}%")
except Exception as e:
logger.error(f"Error publishing to MQTT: {e}")
def is_connected(self) -> bool:
"""Check if connected to MQTT broker."""
return self.connected

View File

@@ -0,0 +1,254 @@
"""
Bluetooth sensor reader for Xiaomi Mijia LYWSD03MMC sensors with ATC firmware.
Reads temperature, humidity, and battery data from BLE advertisements.
"""
import logging
import time
import threading
import bluetooth._bluetooth as bluez
from dataclasses import dataclass
from typing import Optional, Callable, Dict
from . import config
from .utils import (enable_le_scan, disable_le_scan,
parse_le_advertising_events, raw_packet_to_str, toggle_device)
logger = logging.getLogger(__name__)
@dataclass
class Measurement:
"""Sensor measurement data."""
temperature: float
humidity: int
voltage: float
battery: int = 0
rssi: int = 0
sensor_name: str = ""
timestamp: int = 0
class SensorReader:
"""Reads Xiaomi LYWSD03MMC sensors with ATC firmware via BLE."""
def __init__(self, sensor_config: config.SensorConfig,
on_measurement: Callable[[Measurement], None],
interface: int = 0):
"""
Initialize sensor reader.
Args:
sensor_config: Sensor configuration mapping
on_measurement: Callback function for new measurements
interface: Bluetooth interface number (default 0 for hci0)
"""
self.sensor_config = sensor_config
self.on_measurement = on_measurement
self.interface = interface
self.sock: Optional[int] = None
self.running = False
self.last_ble_packet = time.time()
self.adv_counter: Dict[str, str] = {} # Track advertisement numbers to avoid duplicates
self.watchdog_thread: Optional[threading.Thread] = None
def start(self):
"""Start BLE scanning for sensors."""
try:
logger.info(f"Starting BLE scan on hci{self.interface}")
# Enable bluetooth device
toggle_device(self.interface, True)
# Open bluetooth socket
try:
self.sock = bluez.hci_open_dev(self.interface)
except Exception as e:
logger.error(f"Cannot open bluetooth device hci{self.interface}: {e}")
raise
# Enable LE scanning without filtering duplicates
enable_le_scan(self.sock, filter_duplicates=False)
# Start watchdog if configured
if config.WATCHDOG_TIMEOUT > 0:
self.running = True
self.watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self.watchdog_thread.start()
logger.info(f"Watchdog started with {config.WATCHDOG_TIMEOUT}s timeout")
logger.info("BLE scanning enabled")
logger.info(f"Monitoring {len(self.sensor_config.sensors)} sensors")
# Start parsing advertisements (blocking call)
parse_le_advertising_events(
self.sock,
handler=self._handle_ble_packet,
debug=False
)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
self.stop()
except Exception as e:
logger.error(f"Error in sensor reader: {e}")
self.stop()
raise
def stop(self):
"""Stop BLE scanning."""
self.running = False
if self.sock:
try:
disable_le_scan(self.sock)
logger.info("BLE scanning disabled")
except Exception as e:
logger.error(f"Error disabling BLE scan: {e}")
if self.watchdog_thread and self.watchdog_thread.is_alive():
self.watchdog_thread.join(timeout=2)
def _watchdog_loop(self):
"""Watchdog thread to restart BLE scanning if no packets received."""
restart_counter = 1
while self.running:
time.sleep(1)
now = time.time()
elapsed = now - self.last_ble_packet
if elapsed > config.WATCHDOG_TIMEOUT:
logger.warning(
f"Watchdog: No BLE packet within {int(elapsed)}s. "
f"Restarting BLE scan (count: {restart_counter})"
)
try:
disable_le_scan(self.sock)
time.sleep(1)
enable_le_scan(self.sock, filter_duplicates=False)
restart_counter += 1
self.last_ble_packet = now # Reset timer
except Exception as e:
logger.error(f"Error restarting BLE scan: {e}")
def _handle_ble_packet(self, mac: str, adv_type: int, data: bytes, rssi: int):
"""
Handle incoming BLE advertisement packet.
Args:
mac: MAC address of the device
adv_type: Advertisement type
data: Advertisement data
rssi: Signal strength
"""
# Update last packet time for watchdog
self.last_ble_packet = time.time()
# Convert data to hex string
data_str = raw_packet_to_str(data)
# Check if this is an ATC packet
# ATC format: [... service UUID 0x181A ... MAC ... data ...]
atc_identifier = data_str[6:10].upper()
if atc_identifier != "1A18":
return # Not an ATC packet
# Extract MAC from packet and verify it matches
packet_mac = data_str[10:22].upper()
mac_str = mac.replace(":", "").upper()
if packet_mac != mac_str:
return # MAC mismatch
# Check if this is a known sensor or if we accept all
mac_with_colons = mac.upper()
if mac_with_colons not in self.sensor_config.sensors:
logger.debug(f"Ignoring unknown sensor: {mac}")
return
# Check advertisement number to avoid duplicates
adv_number = data_str[-2:]
if mac_str in self.adv_counter:
if self.adv_counter[mac_str] == adv_number:
return # Duplicate packet
self.adv_counter[mac_str] = adv_number
# Parse ATC data packet
try:
measurement = self._parse_atc_packet(data_str, mac_with_colons, rssi)
if measurement:
# Log the measurement
logger.info(
f"{measurement.sensor_name}: {measurement.temperature}°C, "
f"{measurement.humidity}%, {measurement.voltage}V, "
f"battery {measurement.battery}%, RSSI {rssi}dBm"
)
# Call measurement callback
if self.on_measurement:
self.on_measurement(measurement)
except Exception as e:
logger.error(f"Error parsing ATC packet from {mac}: {e}")
def _parse_atc_packet(self, data_str: str, mac: str, rssi: int) -> Optional[Measurement]:
"""
Parse ATC advertisement data packet.
ATC packet format (after service UUID):
- Bytes 10-22: MAC address
- Bytes 22-26: Temperature (signed int16, big endian, /10 for °C)
- Bytes 26-28: Humidity (uint8, %)
- Bytes 28-30: Battery (uint8, %)
- Bytes 30-34: Battery voltage (uint16, big endian, /1000 for V)
- Bytes 34-36: Frame counter
Args:
data_str: Hex string of advertisement data
mac: MAC address with colons
rssi: Signal strength
Returns:
Measurement object or None if parsing failed
"""
try:
# Extract temperature (signed, big endian)
temp_hex = data_str[22:26]
temperature = int.from_bytes(
bytearray.fromhex(temp_hex),
byteorder='big',
signed=True
) / 10.0
# Extract humidity
humidity = int(data_str[26:28], 16)
# Extract battery level
battery = int(data_str[28:30], 16)
# Extract battery voltage
voltage_hex = data_str[30:34]
voltage = int(voltage_hex, 16) / 1000.0
# Get sensor name from config
sensor_name = self.sensor_config.get_name(mac)
# Create measurement
measurement = Measurement(
temperature=temperature,
humidity=humidity,
voltage=voltage,
battery=battery,
rssi=rssi,
sensor_name=sensor_name,
timestamp=int(time.time())
)
return measurement
except Exception as e:
logger.error(f"Error parsing ATC data: {e}")
return None

421
src/sensorpajen/utils.py Normal file
View File

@@ -0,0 +1,421 @@
# -*- coding: utf-8 -*-
# This file is from https://github.com/colin-guyon/py-bluetooth-utils
# published under MIT License
# MIT License
# Copyright (c) 2020 Colin GUYON
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Module containing some bluetooth utility functions (linux only).
It either uses HCI commands using PyBluez, or does ioctl calls like it's
done in Bluez tools such as hciconfig.
Main functions:
- toggle_device : enable or disable a bluetooth device
- set_scan : set scan type on a device ("noscan", "iscan", "pscan", "piscan")
- enable/disable_le_scan : enable BLE scanning
- parse_le_advertising_events : parse and read BLE advertisements packets
- start/stop_le_advertising : advertise custom data using BLE
Bluez : http://www.bluez.org/
PyBluez : http://karulis.github.io/pybluez/
The module was in particular inspired from 'iBeacon-Scanner-'
https://github.com/switchdoclabs/iBeacon-Scanner-/blob/master/blescan.py
and sometimes directly from the Bluez sources.
"""
from __future__ import absolute_import
import sys
import struct
import fcntl
import array
import socket
from errno import EALREADY
# import PyBluez
import bluetooth._bluetooth as bluez
__all__ = ('toggle_device', 'set_scan',
'enable_le_scan', 'disable_le_scan', 'parse_le_advertising_events',
'start_le_advertising', 'stop_le_advertising',
'raw_packet_to_str')
LE_META_EVENT = 0x3E
LE_PUBLIC_ADDRESS = 0x00
LE_RANDOM_ADDRESS = 0x01
OGF_LE_CTL = 0x08
OCF_LE_SET_SCAN_PARAMETERS = 0x000B
OCF_LE_SET_SCAN_ENABLE = 0x000C
OCF_LE_CREATE_CONN = 0x000D
OCF_LE_SET_ADVERTISING_PARAMETERS = 0x0006
OCF_LE_SET_ADVERTISE_ENABLE = 0x000A
OCF_LE_SET_ADVERTISING_DATA = 0x0008
SCAN_TYPE_PASSIVE = 0x00
SCAN_FILTER_DUPLICATES = 0x01
SCAN_DISABLE = 0x00
SCAN_ENABLE = 0x01
# sub-events of LE_META_EVENT
EVT_LE_CONN_COMPLETE = 0x01
EVT_LE_ADVERTISING_REPORT = 0x02
EVT_LE_CONN_UPDATE_COMPLETE = 0x03
EVT_LE_READ_REMOTE_USED_FEATURES_COMPLETE = 0x04
# Advertisement event types
ADV_IND = 0x00
ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03
ADV_SCAN_RSP = 0x04
# Allow Scan Request from Any, Connect Request from Any
FILTER_POLICY_NO_WHITELIST = 0x00
# Allow Scan Request from White List Only, Connect Request from Any
FILTER_POLICY_SCAN_WHITELIST = 0x01
# Allow Scan Request from Any, Connect Request from White List Only
FILTER_POLICY_CONN_WHITELIST = 0x02
# Allow Scan Request from White List Only, Connect Request from White List Only
FILTER_POLICY_SCAN_AND_CONN_WHITELIST = 0x03
def toggle_device(dev_id, enable):
"""
Power ON or OFF a bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param enable: Whether to enable of disable the device.
:type enable: ``bool``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
print("Power %s bluetooth device %d" % ('ON' if enable else 'OFF', dev_id))
# di = struct.pack("HbBIBBIIIHHHH10I", dev_id, *((0,) * 22))
# fcntl.ioctl(hci_sock.fileno(), bluez.HCIGETDEVINFO, di)
req_str = struct.pack("H", dev_id)
request = array.array("b", req_str)
try:
fcntl.ioctl(hci_sock.fileno(),
bluez.HCIDEVUP if enable else bluez.HCIDEVDOWN,
request[0])
except IOError as e:
if e.errno == EALREADY:
print("Bluetooth device %d is already %s" % (
dev_id, 'enabled' if enable else 'disabled'))
else:
raise
finally:
hci_sock.close()
# Types of bluetooth scan
SCAN_DISABLED = 0x00
SCAN_INQUIRY = 0x01
SCAN_PAGE = 0x02
def set_scan(dev_id, scan_type):
"""
Set scan type on a given bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param scan_type: One of
``'noscan'``
``'iscan'``
``'pscan'``
``'piscan'``
:type scan_type: ``str``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
if scan_type == "noscan":
dev_opt = SCAN_DISABLED
elif scan_type == "iscan":
dev_opt = SCAN_INQUIRY
elif scan_type == "pscan":
dev_opt = SCAN_PAGE
elif scan_type == "piscan":
dev_opt = SCAN_PAGE | SCAN_INQUIRY
else:
raise ValueError("Unknown scan type %r" % scan_type)
req_str = struct.pack("HI", dev_id, dev_opt)
print("Set scan type %r to bluetooth device %d" % (scan_type, dev_id))
try:
fcntl.ioctl(hci_sock.fileno(), bluez.HCISETSCAN, req_str)
finally:
hci_sock.close()
def raw_packet_to_str(pkt):
"""
Returns the string representation of a raw HCI packet.
"""
if sys.version_info > (3, 0):
return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in pkt)
else:
return ''.join('%02x' % struct.unpack("B", x)[0] for x in pkt)
def enable_le_scan(sock, interval=0x0800, window=0x0800,
filter_policy=FILTER_POLICY_NO_WHITELIST,
filter_duplicates=True):
"""
Enable LE passive scan (with filtering of duplicate packets enabled).
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param interval: Scan interval.
:param window: Scan window (must be less or equal than given interval).
:param filter_policy: One of
``FILTER_POLICY_NO_WHITELIST`` (default value)
``FILTER_POLICY_SCAN_WHITELIST``
``FILTER_POLICY_CONN_WHITELIST``
``FILTER_POLICY_SCAN_AND_CONN_WHITELIST``
.. note:: Scan interval and window are to multiply by 0.625 ms to
get the real time duration.
"""
print("Enable LE scan")
own_bdaddr_type = LE_PUBLIC_ADDRESS # does not work with LE_RANDOM_ADDRESS
cmd_pkt = struct.pack("<BHHBB", SCAN_TYPE_PASSIVE, interval, window,
own_bdaddr_type, filter_policy)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_PARAMETERS, cmd_pkt)
print("scan params: interval=%.3fms window=%.3fms own_bdaddr=%s "
"whitelist=%s" %
(interval * 0.625, window * 0.625,
'public' if own_bdaddr_type == LE_PUBLIC_ADDRESS else 'random',
'yes' if filter_policy in (FILTER_POLICY_SCAN_WHITELIST,
FILTER_POLICY_SCAN_AND_CONN_WHITELIST)
else 'no'))
cmd_pkt = struct.pack("<BB", SCAN_ENABLE, SCAN_FILTER_DUPLICATES if filter_duplicates else 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def disable_le_scan(sock):
"""
Disable LE scan.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
print("Disable LE scan")
cmd_pkt = struct.pack("<BB", SCAN_DISABLE, 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def start_le_advertising(sock, min_interval=1000, max_interval=1000,
adv_type=ADV_NONCONN_IND, data=()):
"""
Start LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param min_interval: Minimum advertising interval.
:param max_interval: Maximum advertising interval.
:param adv_type: Advertisement type (``ADV_NONCONN_IND`` by default).
:param data: The advertisement data (maximum of 31 bytes).
:type data: iterable
"""
own_bdaddr_type = 0
direct_bdaddr_type = 0
direct_bdaddr = (0,) * 6
chan_map = 0x07 # All channels: 37, 38, 39
filter = 0
struct_params = [min_interval, max_interval, adv_type, own_bdaddr_type,
direct_bdaddr_type]
struct_params.extend(direct_bdaddr)
struct_params.extend((chan_map, filter))
cmd_pkt = struct.pack("<HHBBB6BBB", *struct_params)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_PARAMETERS,
cmd_pkt)
cmd_pkt = struct.pack("<B", 0x01)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
data_length = len(data)
if data_length > 31:
raise ValueError("data is too long (%d but max is 31 bytes)",
data_length)
cmd_pkt = struct.pack("<B%dB" % data_length, data_length, *data)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_DATA, cmd_pkt)
print("Advertising started data_length=%d data=%r" % (data_length, data))
def stop_le_advertising(sock):
"""
Stop LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
cmd_pkt = struct.pack("<B", 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
print("Advertising stopped")
def parse_le_advertising_events(sock, mac_addr=None, packet_length=None,
handler=None, debug=False):
"""
Parse and report LE advertisements.
This is a blocking call, an infinite loop is started and the
given handler will be called each time a new LE advertisement packet
is detected and corresponds to the given filters.
.. note:: The :func:`.start_le_advertising` function must be
called before calling this function.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param mac_addr: list of filtered mac address representations
(uppercase, with ':' separators).
If not specified, the LE advertisement of any device will be reported.
Example: mac_addr=('00:2A:5F:FF:25:11', 'DA:FF:12:33:66:12')
:type mac_addr: ``list`` of ``string``
:param packet_length: Filter a specific length of LE advertisement packet.
:type packet_length: ``int``
:param handler: Handler that will be called each time a LE advertisement
packet is available (in accordance with the ``mac_addr``
and ``packet_length`` filters).
:type handler: ``callable`` taking 4 parameters:
mac (``str``), adv_type (``int``), data (``bytes``) and rssi (``int``)
:param debug: Enable debug prints.
:type debug: ``bool``
"""
if not debug and handler is None:
raise ValueError("You must either enable debug or give a handler !")
old_filter = sock.getsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, 14)
flt = bluez.hci_filter_new()
bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
# bluez.hci_filter_all_events(flt)
bluez.hci_filter_set_event(flt, LE_META_EVENT)
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, flt)
print("socket filter set to ptype=HCI_EVENT_PKT event=LE_META_EVENT")
print("Listening ...")
try:
while True:
pkt = full_pkt = sock.recv(255)
ptype, event, plen = struct.unpack("BBB", pkt[:3])
if event != LE_META_EVENT:
# Should never occur because we filtered with this type of event
print("Not a LE_META_EVENT !")
continue
sub_event, = struct.unpack("B", pkt[3:4])
if sub_event != EVT_LE_ADVERTISING_REPORT:
if debug:
print("Not a EVT_LE_ADVERTISING_REPORT !")
continue
pkt = pkt[4:]
adv_type = struct.unpack("b", pkt[1:2])[0]
mac_addr_str = bluez.ba2str(pkt[3:9])
if packet_length and plen != packet_length:
# ignore this packet
if debug:
print("packet with non-matching length: mac=%s adv_type=%02x plen=%s" %
(mac_addr_str, adv_type, plen))
print(raw_packet_to_str(pkt))
continue
data = pkt[9:-1]
rssi = struct.unpack("b", full_pkt[len(full_pkt)-1:len(full_pkt)])[0]
if mac_addr and mac_addr_str not in mac_addr:
if debug:
print("packet with non-matching mac %s adv_type=%02x data=%s RSSI=%s" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
continue
if debug:
print("LE advertisement: mac=%s adv_type=%02x data=%s RSSI=%d" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
if handler is not None:
try:
handler(mac_addr_str, adv_type, data, rssi)
except Exception as e:
print('Exception when calling handler with a BLE advertising event: %r' % (e,))
import traceback
traceback.print_exc()
except KeyboardInterrupt:
print("\nRestore previous socket filter")
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, old_filter)
raise
"""
def hci_le_add_white_list(int dd, const bdaddr_t *bdaddr, uint8_t type, int to)
{
struct hci_request {
uint16_t ogf;
uint16_t ocf;
int event;
void *cparam;
int clen;
void *rparam;
int rlen;
};
struct hci_request rq;
le_add_device_to_white_list_cp cp;
uint8_t status;
memset(&cp, 0, sizeof(cp));
cp.bdaddr_type = type;
bacpy(&cp.bdaddr, bdaddr);
memset(&rq, 0, sizeof(rq));
rq.ogf = OGF_LE_CTL;
rq.ocf = OCF_LE_ADD_DEVICE_TO_WHITE_LIST;
rq.cparam = &cp;
rq.clen = LE_ADD_DEVICE_TO_WHITE_LIST_CP_SIZE;
rq.rparam = &status;
rq.rlen = 1;
if (hci_send_req(dd, &rq, to) < 0)
return -1;
if (status) {
errno = EIO;
return -1;
}
return 0;
}"""