Fix Debian package build issues

- Remove debian/compat file (conflicts with Build-Depends)
- Fix debian/install to use correct readme.md filename
- Update verify-deb.sh to mark debian/compat as optional
- Add -Zgzip flag to dpkg-buildpackage for compatibility
  (uses gzip instead of zstd for better compatibility)
- Update verify-deb.sh to check optional vs required files

Package now builds and installs successfully on systems
without zstd support.
This commit is contained in:
2025-12-28 00:02:49 +01:00
parent 427df1f034
commit 234391a881
31 changed files with 2303 additions and 5 deletions

1
debian/compat vendored
View File

@@ -1 +0,0 @@
13

1
debian/debhelper-build-stamp vendored Normal file
View File

@@ -0,0 +1 @@
sensorpajen

2
debian/files vendored Normal file
View File

@@ -0,0 +1,2 @@
sensorpajen_2.0.0-dev_all.deb misc optional
sensorpajen_2.0.0-dev_amd64.buildinfo misc optional

2
debian/install vendored
View File

@@ -1,7 +1,7 @@
src/sensorpajen/*.py opt/sensorpajen/src/sensorpajen/
scripts/approve-sensors.sh opt/sensorpajen/scripts/
pyproject.toml opt/sensorpajen/
README.md usr/share/doc/sensorpajen/
readme.md usr/share/doc/sensorpajen/
INSTALL.md usr/share/doc/sensorpajen/
ROADMAP.md usr/share/doc/sensorpajen/
config/*.example usr/share/doc/sensorpajen/examples/

1
debian/sensorpajen.debhelper.log vendored Normal file
View File

@@ -0,0 +1 @@
dh_auto_install

12
debian/sensorpajen.postrm.debhelper vendored Normal file
View File

@@ -0,0 +1,12 @@
# Automatically added by dh_installsystemd/13.14.1ubuntu5
if [ "$1" = remove ] && [ -d /run/systemd/system ] ; then
systemctl --system daemon-reload >/dev/null || true
fi
# End automatically added section
# Automatically added by dh_installsystemd/13.14.1ubuntu5
if [ "$1" = "purge" ]; then
if [ -x "/usr/bin/deb-systemd-helper" ]; then
deb-systemd-helper purge 'sensorpajen.service' >/dev/null || true
fi
fi
# End automatically added section

2
debian/sensorpajen.substvars vendored Normal file
View File

@@ -0,0 +1,2 @@
misc:Depends=
misc:Pre-Depends=

20
debian/sensorpajen/DEBIAN/control vendored Normal file
View File

@@ -0,0 +1,20 @@
Package: sensorpajen
Version: 2.0.0-dev
Architecture: all
Maintainer: Fredrik <fredrik@wahlberg.se>
Installed-Size: 109
Depends: python3 (>= 3.9), python3-venv, python3-pip, bluetooth, bluez, libcap2-bin
Recommends: mosquitto-clients
Section: misc
Priority: optional
Homepage: https://github.com/yourusername/sensorpajen
Description: Raspberry Pi Bluetooth temperature sensor monitor
Monitors Xiaomi Mijia LYWSD03MMC temperature sensors via Bluetooth Low Energy
and publishes readings to MQTT broker. Supports ATC firmware with automatic
sensor discovery and approval workflow.
.
Features:
- Automatic sensor discovery
- MQTT publishing
- Systemd service integration
- User approval workflow for new sensors

18
debian/sensorpajen/DEBIAN/md5sums vendored Normal file
View File

@@ -0,0 +1,18 @@
90ea43f1be78ca18b9210f2d370001c4 opt/sensorpajen/pyproject.toml
940d73f24eb9f971ce27f9355e3072f3 opt/sensorpajen/scripts/approve-sensors.sh
20eb4f3839b990a530410768897402c0 opt/sensorpajen/src/sensorpajen/__init__.py
1f452c46e42f8dc3751dba6ca68256e9 opt/sensorpajen/src/sensorpajen/approve_sensors.py
da40d9df301d523d517d7cf2809d6f11 opt/sensorpajen/src/sensorpajen/config.py
65c63383dde4f0b249b708f854ec75a3 opt/sensorpajen/src/sensorpajen/discovery_manager.py
592f8a534833c9e403967fcc0ead8eb1 opt/sensorpajen/src/sensorpajen/main.py
331bf9b314492acc6ce03896367f3cf6 opt/sensorpajen/src/sensorpajen/mqtt_publisher.py
5f4ea191e35ce092f39ec0a4f663cb38 opt/sensorpajen/src/sensorpajen/sensor_reader.py
c8dd8fe8fc174a9cd35251fdf80e7b5f opt/sensorpajen/src/sensorpajen/utils.py
c9c22f9c1d65bfafd89fa45f16b7192b usr/lib/systemd/system/sensorpajen.service
4ddb9618c940286f91df901ec818959a usr/share/doc/sensorpajen/INSTALL.md.gz
bd2f1371c60af415bc9d0dbc1111184d usr/share/doc/sensorpajen/ROADMAP.md.gz
380e8e6b01b757ceac05bc5805844ae4 usr/share/doc/sensorpajen/changelog.Debian.gz
14152a98d7cd7fe8daf280aacc4cbf3f usr/share/doc/sensorpajen/examples/discovered_sensors.json.example
b0249deee21ceb834bc2fe9947be82d3 usr/share/doc/sensorpajen/examples/sensorpajen.env.example
292efbddd951c39cb2c9546d5fac5e05 usr/share/doc/sensorpajen/examples/sensors.json.example
5f647c63bfc3b174611694779fd215e0 usr/share/doc/sensorpajen/readme.md.gz

126
debian/sensorpajen/DEBIAN/postinst vendored Executable file
View File

@@ -0,0 +1,126 @@
#!/bin/bash
set -e
case "$1" in
configure)
# Create sensorpajen system user if it doesn't exist
if ! getent passwd sensorpajen > /dev/null; then
useradd --system --no-create-home --shell /usr/sbin/nologin sensorpajen
echo "Created system user: sensorpajen"
fi
# Create config directory with proper permissions
mkdir -p /etc/sensorpajen
chown sensorpajen:sensorpajen /etc/sensorpajen
chmod 750 /etc/sensorpajen
# Copy example configs to /etc/sensorpajen if they don't exist
for sample in sensorpajen.env.example sensors.json.example discovered_sensors.json.example; do
source_file="/usr/share/doc/sensorpajen/examples/$sample"
target_file="/etc/sensorpajen/${sample%.example}"
if [ -f "$source_file" ] && [ ! -f "$target_file" ]; then
cp "$source_file" "$target_file"
chown sensorpajen:sensorpajen "$target_file"
# Set restrictive permissions on env file (contains credentials)
if [ "$sample" = "sensorpajen.env.example" ]; then
chmod 600 "$target_file"
echo "Created $target_file (edit this file with your MQTT credentials)"
else
chmod 640 "$target_file"
echo "Created $target_file"
fi
fi
done
# Create virtual environment in /opt/sensorpajen
cd /opt/sensorpajen
if [ ! -d "venv" ]; then
echo "Creating Python virtual environment..."
python3 -m venv venv
venv/bin/pip install --upgrade pip setuptools wheel
fi
# Install Python dependencies from pyproject.toml
echo "Installing Python dependencies..."
venv/bin/pip install -e . || {
echo "Warning: pip install failed. You may need to install dependencies manually."
}
# Set Bluetooth capabilities on Python executable
PYTHON_PATH=$(readlink -f /opt/sensorpajen/venv/bin/python3)
if command -v setcap >/dev/null 2>&1; then
setcap cap_net_raw,cap_net_admin+eip "$PYTHON_PATH" || {
echo "Warning: setcap failed. You may need to run Bluetooth operations as root."
echo "Try: sudo setcap cap_net_raw,cap_net_admin+eip $PYTHON_PATH"
}
else
echo "Warning: setcap not found (install libcap2-bin package)"
fi
# Set ownership of application directory
chown -R sensorpajen:sensorpajen /opt/sensorpajen
# Install systemd service file
if [ -f /opt/sensorpajen/debian/sensorpajen.service ]; then
cp /opt/sensorpajen/debian/sensorpajen.service /etc/systemd/system/
elif [ -f /usr/share/doc/sensorpajen/sensorpajen.service ]; then
cp /usr/share/doc/sensorpajen/sensorpajen.service /etc/systemd/system/
fi
# Reload systemd
systemctl daemon-reload
# Enable service (but don't start - needs configuration first)
systemctl enable sensorpajen.service || {
echo "Warning: Could not enable sensorpajen service"
}
# Check if configuration is ready
if [ -f /etc/sensorpajen/sensorpajen.env ] && [ -f /etc/sensorpajen/sensors.json ]; then
# Check if env file has been configured (not default values)
if grep -q "MQTT_HOST=192.168.0.114" /etc/sensorpajen/sensorpajen.env; then
echo ""
echo "======================================================================"
echo " Configuration needed!"
echo "======================================================================"
echo " Edit /etc/sensorpajen/sensorpajen.env with your MQTT settings"
echo " Edit /etc/sensorpajen/sensors.json with your sensor list"
echo " Then run: sudo systemctl start sensorpajen"
echo "======================================================================"
echo ""
else
# Configuration appears to be customized, restart service
systemctl restart sensorpajen.service && {
echo "Sensorpajen service started"
echo "View logs: sudo journalctl -u sensorpajen -f"
} || {
echo "Failed to start service. Check: sudo systemctl status sensorpajen"
}
fi
else
echo ""
echo "======================================================================"
echo " Sensorpajen installed successfully!"
echo "======================================================================"
echo " Next steps:"
echo " 1. Edit /etc/sensorpajen/sensorpajen.env"
echo " 2. Edit /etc/sensorpajen/sensors.json"
echo " 3. sudo systemctl start sensorpajen"
echo " 4. sudo journalctl -u sensorpajen -f"
echo "======================================================================"
echo ""
fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
;;
*)
echo "postinst called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

41
debian/sensorpajen/DEBIAN/postrm vendored Executable file
View File

@@ -0,0 +1,41 @@
#!/bin/bash
set -e
case "$1" in
remove)
# Service removed but config and user preserved
echo "Sensorpajen removed. Configuration preserved in /etc/sensorpajen/"
echo "To remove config: sudo rm -rf /etc/sensorpajen/"
# Remove systemd service file
rm -f /etc/systemd/system/sensorpajen.service
systemctl daemon-reload || true
;;
purge)
# Even on purge, we keep config by default (user can manually delete)
# This is safer as it prevents accidental data loss
echo "Configuration preserved in /etc/sensorpajen/"
echo "To remove config: sudo rm -rf /etc/sensorpajen/"
echo "To remove user: sudo userdel sensorpajen"
# Remove systemd service file
rm -f /etc/systemd/system/sensorpajen.service
systemctl daemon-reload || true
# Note: We intentionally do NOT remove:
# - /etc/sensorpajen (contains user data)
# - sensorpajen user (may own other files/processes)
# User must remove these manually if desired
;;
upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
;;
*)
echo "postrm called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

27
debian/sensorpajen/DEBIAN/prerm vendored Executable file
View File

@@ -0,0 +1,27 @@
#!/bin/bash
set -e
case "$1" in
remove|upgrade|deconfigure)
# Stop service before removal or upgrade
if systemctl is-active --quiet sensorpajen.service 2>/dev/null; then
echo "Stopping sensorpajen service..."
systemctl stop sensorpajen.service || true
fi
# Disable service on removal (not upgrade)
if [ "$1" = "remove" ]; then
systemctl disable sensorpajen.service || true
fi
;;
failed-upgrade)
;;
*)
echo "prerm called with unknown argument \`$1'" >&2
exit 1
;;
esac
exit 0

View File

@@ -0,0 +1,64 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "sensorpajen"
version = "2.0.0-dev"
description = "Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC"
readme = "README.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [
{name = "Fredrik", email = "your@email.com"}
]
keywords = ["bluetooth", "temperature", "sensor", "mqtt", "raspberry-pi"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Operating System :: POSIX :: Linux",
"Topic :: Home Automation",
]
dependencies = [
"bluepy>=1.3.0",
"paho-mqtt>=1.6.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"black>=23.0",
"ruff>=0.1.0",
]
[project.urls]
Homepage = "https://github.com/yourusername/sensorpajen"
Repository = "https://github.com/yourusername/sensorpajen"
[project.scripts]
sensorpajen = "sensorpajen.main:main"
sensorpajen-approve-sensors = "sensorpajen.approve_sensors:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.black]
line-length = 100
target-version = ["py39", "py310", "py311"]
[tool.ruff]
line-length = 100
target-version = "py39"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]

View File

@@ -0,0 +1,48 @@
#!/bin/bash
# Wrapper script for approve-sensors that works in both dev and system mode
# Detect installation type
if [ -d "/opt/sensorpajen" ]; then
# System installation
PROJECT_ROOT="/opt/sensorpajen"
VENV_PATH="/opt/sensorpajen/venv"
# Load config from system location
if [ -f "/etc/sensorpajen/sensorpajen.env" ]; then
set -a
source /etc/sensorpajen/sensorpajen.env
set +a
else
echo "Warning: /etc/sensorpajen/sensorpajen.env not found"
# Set minimal defaults
export MQTT_HOST="${MQTT_HOST:-localhost}"
export MQTT_PORT="${MQTT_PORT:-1883}"
fi
else
# Development installation
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
PROJECT_ROOT="$( cd "$SCRIPT_DIR/.." && pwd )"
VENV_PATH="$PROJECT_ROOT/.venv"
# Set minimal required environment variables
export MQTT_HOST="${MQTT_HOST:-localhost}"
export MQTT_PORT="${MQTT_PORT:-1883}"
# Load actual config if it exists (will override defaults)
if [ -f "$PROJECT_ROOT/config/sensorpajen.env" ]; then
set -a
source "$PROJECT_ROOT/config/sensorpajen.env"
set +a
fi
fi
# Activate virtual environment
if [ -f "$VENV_PATH/bin/activate" ]; then
source "$VENV_PATH/bin/activate"
else
echo "Error: Virtual environment not found at $VENV_PATH"
exit 1
fi
# Run the approve-sensors command
python -m sensorpajen.approve_sensors "$@"

View File

@@ -0,0 +1,10 @@
"""
Sensorpajen - Bluetooth Temperature Sensor Monitor
Monitors Xiaomi Mijia LYWSD03MMC Bluetooth temperature sensors
and publishes data to MQTT broker.
"""
__version__ = "2.0.0-dev"
__author__ = "Fredrik"
__license__ = "MIT"

View File

@@ -0,0 +1,303 @@
#!/usr/bin/env python3
"""
CLI tool for approving or ignoring discovered sensors.
Interactive tool to manage pending and ignored sensors.
"""
import sys
import json
import logging
import argparse
from pathlib import Path
from typing import List
from . import config
from .discovery_manager import DiscoveryManager, DiscoveredSensor
logger = logging.getLogger(__name__)
def format_metadata_comment(sensor: DiscoveredSensor) -> str:
"""
Format sensor metadata as a comment string.
Args:
sensor: Discovered sensor
Returns:
Formatted comment string
"""
return (
f"MAC: {sensor.mac}, "
f"Name: {sensor.name}, "
f"Last seen: {sensor.last_seen}, "
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C, "
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%, "
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%"
)
def display_sensor(sensor: DiscoveredSensor, index: int, total: int):
"""
Display sensor information to the user.
Args:
sensor: Discovered sensor to display
index: Current sensor number (1-based)
total: Total number of sensors
"""
print(f"\n{'='*70}")
print(f"Sensor {index}/{total}")
print(f"{'='*70}")
print(f"MAC Address: {sensor.mac}")
print(f"Device Name: {sensor.name}")
print(f"Last Seen: {sensor.last_seen}")
print(f"Status: {sensor.status}")
if sensor.status == "ignored" and sensor.ignored_at:
print(f"Ignored At: {sensor.ignored_at}")
if sensor.ignore_reason:
print(f"Reason: {sensor.ignore_reason}")
# Display sample reading
reading = sensor.sample_reading
print(f"\nSample Reading:")
print(f" Temperature: {reading.get('temperature', 'N/A')}°C")
print(f" Humidity: {reading.get('humidity', 'N/A')}%")
print(f" Battery: {reading.get('battery_percent', 'N/A')}%")
print(f" Voltage: {reading.get('battery_voltage', 'N/A')}mV")
print(f"{'='*70}")
def get_user_choice() -> str:
"""
Get user's choice for what to do with the sensor.
Returns:
User choice: 'a' (approve), 'i' (ignore), 's' (skip)
"""
while True:
choice = input("\n[A]pprove, [I]gnore, [S]kip, [Q]uit? ").strip().lower()
if choice in ['a', 'i', 's', 'q']:
return choice
print("Invalid choice. Please enter A, I, S, or Q.")
def approve_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Approve a sensor and add it to sensors.json.
Args:
sensor: Sensor to approve
manager: Discovery manager
"""
# Check if sensor already exists in sensors.json
sensor_config_path = Path(config.SENSOR_CONFIG_FILE)
try:
with open(sensor_config_path, 'r') as f:
data = json.load(f)
# Check for duplicates
for existing_sensor in data.get('sensors', []):
if existing_sensor.get('mac', '').upper() == sensor.mac:
print(f"\n⚠️ Sensor {sensor.mac} already exists in sensors.json")
print(" Renaming must be done manually in the file.")
return
except FileNotFoundError:
# File doesn't exist yet, create with empty sensors list
data = {'sensors': []}
except json.JSONDecodeError as e:
print(f"\n❌ Error: Invalid JSON in {sensor_config_path}: {e}")
return
# Get sensor name from user
while True:
name = input("\nEnter sensor name (required): ").strip()
if name:
break
print("Sensor name cannot be empty.")
# Pre-fill comment with metadata
default_comment = format_metadata_comment(sensor)
print(f"\nDefault comment:")
print(f" {default_comment}")
edit = input("\nEdit comment? [y/N]: ").strip().lower()
if edit == 'y':
print("\nEnter comment (or press Enter to keep default):")
comment = input("> ").strip()
if not comment:
comment = default_comment
else:
comment = default_comment
# Add to sensors.json
new_sensor = {
"mac": sensor.mac,
"name": name
}
if comment:
new_sensor["comment"] = comment
data.setdefault('sensors', []).append(new_sensor)
try:
with open(sensor_config_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"\n✅ Sensor approved and added to sensors.json")
print(f" Name: {name}")
print(f" Configuration will be reloaded automatically within 15 minutes")
# Mark as approved in discovery manager
manager.approve(sensor.mac)
except Exception as e:
print(f"\n❌ Error saving to sensors.json: {e}")
def ignore_sensor(sensor: DiscoveredSensor, manager: DiscoveryManager):
"""
Ignore a sensor.
Args:
sensor: Sensor to ignore
manager: Discovery manager
"""
reason = input("\nReason for ignoring (optional): ").strip()
manager.ignore(sensor.mac, reason if reason else None)
print(f"\n✅ Sensor ignored")
if reason:
print(f" Reason: {reason}")
def process_sensors(sensors: List[DiscoveredSensor], manager: DiscoveryManager):
"""
Process list of sensors interactively.
Args:
sensors: List of sensors to process
manager: Discovery manager
"""
if not sensors:
print("\n✅ No sensors to process")
return
print(f"\nFound {len(sensors)} sensor(s) to review")
for i, sensor in enumerate(sensors, 1):
# Mark as reviewed when shown
manager.mark_reviewed(sensor.mac)
display_sensor(sensor, i, len(sensors))
choice = get_user_choice()
if choice == 'q':
print("\n👋 Exiting...")
break
elif choice == 'a':
approve_sensor(sensor, manager)
elif choice == 'i':
ignore_sensor(sensor, manager)
elif choice == 's':
print("\n⏭️ Skipped")
continue
def main():
"""Main entry point for approve-sensors CLI."""
# Parse command line arguments
parser = argparse.ArgumentParser(
description="Approve or ignore discovered Bluetooth sensors",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Show only new pending sensors
%(prog)s --all # Show all pending sensors (including reviewed)
%(prog)s --ignored # Show only ignored sensors
%(prog)s --all --ignored # Show all sensors
"""
)
parser.add_argument(
'--all', '-a',
action='store_true',
help='Show all pending sensors, including previously reviewed ones'
)
parser.add_argument(
'--ignored', '-i',
action='store_true',
help='Show ignored sensors'
)
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.WARNING,
format='%(levelname)s: %(message)s'
)
print("=" * 70)
print("Sensorpajen - Approve Sensors")
print("=" * 70)
try:
# Load discovery manager
manager = DiscoveryManager()
# Get sensors based on flags
if args.all:
pending = manager.get_pending()
pending_label = "all pending"
else:
pending = manager.get_new_pending()
pending_label = "new pending"
ignored = manager.get_ignored() if args.ignored else []
if not pending and not ignored:
if args.all or args.ignored:
print(f"\n✅ No {pending_label if pending else 'ignored'} sensors found")
else:
print("\n✅ No new sensors to review")
all_pending = manager.get_pending()
if all_pending:
print(f"\nThere are {len(all_pending)} previously reviewed pending sensor(s).")
print("Run with --all to review them again.")
return 0
# Process pending sensors
if pending:
print(f"\n📋 Processing {len(pending)} {pending_label} sensor(s)...")
process_sensors(pending, manager)
# Process ignored sensors if requested
if ignored:
if pending:
print("\n" + "=" * 70)
print(f"\n📋 Processing {len(ignored)} ignored sensor(s)...")
process_sensors(ignored, manager)
print("\n" + "=" * 70)
print("Done!")
print("=" * 70)
return 0
except KeyboardInterrupt:
print("\n\n👋 Interrupted by user")
return 1
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,154 @@
"""
Configuration management for Sensorpajen.
Loads configuration from environment variables with sensible defaults.
Configuration files are loaded relative to the project root.
"""
import os
import json
import logging
from pathlib import Path
from typing import Dict, List
logger = logging.getLogger(__name__)
# Determine project root and config directory
# Check if running from system installation (/opt/sensorpajen) or development
if Path('/opt/sensorpajen').exists():
# System installation
PROJECT_ROOT = Path('/opt/sensorpajen')
CONFIG_DIR = Path('/etc/sensorpajen')
else:
# Development installation (3 levels up from this file: src/sensorpajen/config.py)
PROJECT_ROOT = Path(__file__).parent.parent.parent
CONFIG_DIR = PROJECT_ROOT / "config"
# MQTT Configuration from environment
MQTT_HOST = os.environ.get("MQTT_HOST")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASSWORD = os.environ.get("MQTT_PASSWORD")
MQTT_CLIENT_ID = os.environ.get("MQTT_CLIENT_ID", "sensorpajen")
MQTT_TOPIC_PREFIX = os.environ.get("MQTT_TOPIC_PREFIX", "MiTemperature2")
# Validate required MQTT configuration
if not MQTT_HOST:
raise RuntimeError(
"MQTT_HOST environment variable must be set. "
"Please configure config/sensorpajen.env"
)
# Sensor configuration file
SENSOR_CONFIG_FILE = os.environ.get(
"SENSOR_CONFIG_FILE",
str(CONFIG_DIR / "sensors.json")
)
# Application settings
WATCHDOG_TIMEOUT = int(os.environ.get("WATCHDOG_TIMEOUT", "5"))
ENABLE_BATTERY = os.environ.get("ENABLE_BATTERY", "true").lower() == "true"
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
# Bluetooth settings
SKIP_IDENTICAL = int(os.environ.get("SKIP_IDENTICAL", "50"))
DEBOUNCE = os.environ.get("DEBOUNCE", "true").lower() == "true"
# ntfy notification settings (optional)
NTFY_ENABLED = os.environ.get("NTFY_ENABLED", "false").lower() == "true"
NTFY_URL = os.environ.get("NTFY_URL", "https://ntfy.sh")
NTFY_TOPIC = os.environ.get("NTFY_TOPIC", "sensorpajen")
NTFY_TOKEN = os.environ.get("NTFY_TOKEN", "")
# Discovery settings
DISCOVERED_SENSORS_FILE = os.environ.get(
"DISCOVERED_SENSORS_FILE",
str(CONFIG_DIR / "discovered_sensors.json")
)
CONFIG_RELOAD_INTERVAL = int(os.environ.get("CONFIG_RELOAD_INTERVAL", "900")) # 15 minutes
class SensorConfig:
"""Manages sensor configuration from JSON file."""
def __init__(self, config_file: str = SENSOR_CONFIG_FILE):
"""
Initialize sensor configuration.
Args:
config_file: Path to sensors JSON configuration file
"""
self.config_file = Path(config_file)
self.sensors: Dict[str, str] = {}
self.load()
def load(self):
"""Load sensor configuration from JSON file."""
if not self.config_file.exists():
raise FileNotFoundError(
f"Sensor configuration file not found: {self.config_file}\n"
f"Please copy config/sensors.json.example to config/sensors.json "
f"and configure your sensors."
)
try:
with open(self.config_file, 'r') as f:
data = json.load(f)
# Convert sensors list to MAC -> name mapping
for sensor in data.get('sensors', []):
mac = sensor.get('mac', '').upper()
name = sensor.get('name')
if mac and name:
self.sensors[mac] = name
logger.debug(f"Loaded sensor: {mac} -> {name}")
logger.info(f"Loaded {len(self.sensors)} sensors from {self.config_file}")
except json.JSONDecodeError as e:
raise RuntimeError(f"Invalid JSON in {self.config_file}: {e}")
except Exception as e:
raise RuntimeError(f"Error loading sensor config: {e}")
def get_name(self, mac: str) -> str:
"""
Get sensor name by MAC address.
Args:
mac: MAC address (any case)
Returns:
Sensor name or the MAC address if not found
"""
return self.sensors.get(mac.upper(), mac)
def get_all_macs(self) -> List[str]:
"""Get list of all configured MAC addresses."""
return list(self.sensors.keys())
def validate_config():
"""
Validate configuration and log settings.
Should be called at application startup.
"""
install_type = "System" if Path('/opt/sensorpajen').exists() else "Development"
logger.info("=== Sensorpajen Configuration ===")
logger.info(f"Installation Type: {install_type}")
logger.info(f"Project Root: {PROJECT_ROOT}")
logger.info(f"Config Directory: {CONFIG_DIR}")
logger.info(f"MQTT Host: {MQTT_HOST}:{MQTT_PORT}")
logger.info(f"MQTT User: {MQTT_USER}")
logger.info(f"MQTT Client ID: {MQTT_CLIENT_ID}")
logger.info(f"MQTT Topic Prefix: {MQTT_TOPIC_PREFIX}")
logger.info(f"Sensor Config: {SENSOR_CONFIG_FILE}")
logger.info(f"Discovered Sensors: {DISCOVERED_SENSORS_FILE}")
logger.info(f"Watchdog Timeout: {WATCHDOG_TIMEOUT}s")
logger.info(f"Battery Monitoring: {ENABLE_BATTERY}")
logger.info(f"Config Reload Interval: {CONFIG_RELOAD_INTERVAL}s")
logger.info(f"ntfy Enabled: {NTFY_ENABLED}")
if NTFY_ENABLED:
logger.info(f"ntfy URL: {NTFY_URL}/{NTFY_TOPIC}")
logger.info(f"Log Level: {LOG_LEVEL}")
logger.info("================================")

View File

@@ -0,0 +1,263 @@
"""
Discovery manager for tracking and managing discovered sensors.
Maintains a database of discovered sensors with their metadata and status.
"""
import json
import logging
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from . import config
logger = logging.getLogger(__name__)
@dataclass
class DiscoveredSensor:
"""Represents a discovered sensor with metadata."""
mac: str
name: str
rssi: int
first_seen: str
last_seen: str
sample_reading: Dict[str, float]
status: str = "pending" # pending, approved, ignored
reviewed: bool = False # Has been shown in approval CLI
ignored_at: Optional[str] = None
ignore_reason: Optional[str] = None
class DiscoveryManager:
"""Manages discovered sensors and their approval status."""
def __init__(self, discovery_file: str = config.DISCOVERED_SENSORS_FILE):
"""
Initialize discovery manager.
Args:
discovery_file: Path to discovered sensors JSON file
"""
self.discovery_file = Path(discovery_file)
self.sensors: Dict[str, DiscoveredSensor] = {}
self.load()
def load(self):
"""Load discovered sensors from JSON file."""
if not self.discovery_file.exists():
logger.info(f"Creating new discovered sensors file: {self.discovery_file}")
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
self.save()
return
try:
with open(self.discovery_file, 'r') as f:
data = json.load(f)
for sensor_data in data:
sensor = DiscoveredSensor(**sensor_data)
self.sensors[sensor.mac.upper()] = sensor
logger.info(f"Loaded {len(self.sensors)} discovered sensors")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in {self.discovery_file}: {e}")
except Exception as e:
logger.error(f"Error loading discovered sensors: {e}")
def save(self):
"""Save discovered sensors to JSON file."""
try:
# Ensure directory exists
self.discovery_file.parent.mkdir(parents=True, exist_ok=True)
# Convert sensors to list of dicts
data = [asdict(sensor) for sensor in self.sensors.values()]
with open(self.discovery_file, 'w') as f:
json.dump(data, f, indent=2)
logger.debug(f"Saved {len(self.sensors)} discovered sensors")
except Exception as e:
logger.error(f"Error saving discovered sensors: {e}")
def add_or_update(self, mac: str, name: str, rssi: int,
temperature: float, humidity: float,
battery_percent: int, battery_voltage: int) -> bool:
"""
Add or update a discovered sensor.
Args:
mac: MAC address
name: Advertised device name
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
Returns:
True if this is a newly discovered sensor, False if updated existing
"""
mac = mac.upper()
now = datetime.now().isoformat()
sample_reading = {
"temperature": temperature,
"humidity": humidity,
"battery_percent": battery_percent,
"battery_voltage": battery_voltage
}
if mac in self.sensors:
# Update existing sensor
sensor = self.sensors[mac]
sensor.last_seen = now
sensor.rssi = rssi
sensor.sample_reading = sample_reading
self.save()
return False
else:
# New sensor discovered
sensor = DiscoveredSensor(
mac=mac,
name=name,
rssi=rssi,
first_seen=now,
last_seen=now,
sample_reading=sample_reading,
status="pending"
)
self.sensors[mac] = sensor
self.save()
logger.info(f"New sensor discovered: {mac} ({name})")
return True
def is_known(self, mac: str) -> bool:
"""
Check if a sensor has been discovered before.
Args:
mac: MAC address
Returns:
True if sensor is in discovered list
"""
return mac.upper() in self.sensors
def get_status(self, mac: str) -> Optional[str]:
"""
Get status of a discovered sensor.
Args:
mac: MAC address
Returns:
Status string or None if not found
"""
sensor = self.sensors.get(mac.upper())
return sensor.status if sensor else None
def approve(self, mac: str):
"""
Mark a sensor as approved.
Args:
mac: MAC address
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "approved"
self.save()
logger.info(f"Sensor approved: {mac}")
def ignore(self, mac: str, reason: Optional[str] = None):
"""
Mark a sensor as ignored.
Args:
mac: MAC address
reason: Optional reason for ignoring
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].status = "ignored"
self.sensors[mac].ignored_at = datetime.now().isoformat()
self.sensors[mac].ignore_reason = reason
self.save()
logger.info(f"Sensor ignored: {mac}")
def get_pending(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'pending'."""
return [s for s in self.sensors.values() if s.status == "pending"]
def get_new_pending(self) -> List[DiscoveredSensor]:
"""Get list of pending sensors that haven't been reviewed yet."""
return [s for s in self.sensors.values() if s.status == "pending" and not s.reviewed]
def get_ignored(self) -> List[DiscoveredSensor]:
"""Get list of sensors with status 'ignored'."""
return [s for s in self.sensors.values() if s.status == "ignored"]
def mark_reviewed(self, mac: str):
"""
Mark a sensor as reviewed (shown in approval CLI).
Args:
mac: MAC address
"""
mac = mac.upper()
if mac in self.sensors:
self.sensors[mac].reviewed = True
self.save()
def send_ntfy_notification(self, sensor: DiscoveredSensor):
"""
Send ntfy notification for a newly discovered sensor.
Args:
sensor: Discovered sensor to notify about
"""
if not config.NTFY_ENABLED:
logger.debug("ntfy notifications disabled")
return
if not config.NTFY_TOKEN:
logger.warning("ntfy enabled but NTFY_TOKEN not set")
return
try:
message = (
f"🆕 New sensor discovered!\n\n"
f"MAC: {sensor.mac}\n"
f"Name: {sensor.name}\n"
f"Last seen: {sensor.last_seen}\n"
f"Temp: {sensor.sample_reading.get('temperature', 'N/A')}°C\n"
f"Humidity: {sensor.sample_reading.get('humidity', 'N/A')}%\n"
f"Battery: {sensor.sample_reading.get('battery_percent', 'N/A')}%\n\n"
f"Run 'sensorpajen approve-sensors' to approve or ignore."
)
url = f"{config.NTFY_URL}/{config.NTFY_TOPIC}"
result = subprocess.run(
["curl", "-H", f"Authorization: Bearer {config.NTFY_TOKEN}",
"-d", message, url],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Sent ntfy notification for {sensor.mac}")
else:
logger.warning(f"ntfy notification failed: {result.stderr.decode()}")
except subprocess.TimeoutExpired:
logger.warning("ntfy notification timed out")
except Exception as e:
logger.error(f"Error sending ntfy notification: {e}")

View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Sensorpajen - Main entry point
Bluetooth temperature sensor monitor for Xiaomi Mijia LYWSD03MMC sensors.
Publishes sensor data to MQTT broker.
"""
import sys
import signal
import logging
import time
import threading
from pathlib import Path
from . import __version__
from . import config
from .mqtt_publisher import MQTTPublisher
from .sensor_reader import SensorReader, Measurement
from .discovery_manager import DiscoveryManager
class Sensorpajen:
"""Main application class."""
def __init__(self):
"""Initialize the application."""
self.mqtt_publisher: MQTTPublisher = None
self.sensor_reader: SensorReader = None
self.sensor_config: config.SensorConfig = None
self.discovery_manager: DiscoveryManager = None
self.running = False
self.config_reload_timer: threading.Timer = None
# Setup logging
self._setup_logging()
# Setup signal handlers
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _setup_logging(self):
"""Configure logging to stdout for journald."""
log_level = getattr(logging, config.LOG_LEVEL, logging.INFO)
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout
)
# Set our logger
self.logger = logging.getLogger(__name__)
def _signal_handler(self, sig, frame):
"""Handle shutdown signals."""
signal_name = "SIGTERM" if sig == signal.SIGTERM else "SIGINT"
self.logger.info(f"Received {signal_name}, shutting down gracefully...")
self.shutdown()
sys.exit(0)
def _on_measurement(self, measurement: Measurement):
"""
Callback for new sensor measurements.
Args:
measurement: Sensor measurement data
"""
try:
# Publish to MQTT
self.mqtt_publisher.publish_measurement(
sensor_name=measurement.sensor_name,
temperature=measurement.temperature,
humidity=measurement.humidity,
battery_voltage=measurement.voltage,
battery_level=measurement.battery
)
except Exception as e:
self.logger.error(f"Error handling measurement: {e}")
def _reload_config(self):
"""Reload sensor configuration periodically."""
if not self.running:
return
try:
self.logger.info("Reloading sensor configuration...")
old_sensors = set(self.sensor_config.sensors.keys())
self.sensor_config.load()
new_sensors = set(self.sensor_config.sensors.keys())
added = new_sensors - old_sensors
removed = old_sensors - new_sensors
if added:
self.logger.info(f"Added sensors: {', '.join(added)}")
if removed:
self.logger.info(f"Removed sensors: {', '.join(removed)}")
if not added and not removed:
self.logger.debug("No sensor configuration changes")
except Exception as e:
self.logger.error(f"Error reloading configuration: {e}")
finally:
# Schedule next reload
if self.running:
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
def start(self):
"""Start the application."""
try:
self.logger.info("=" * 50)
self.logger.info(f"Starting Sensorpajen v{__version__}")
self.logger.info("=" * 50)
# Validate and log configuration
config.validate_config()
# Load sensor configuration
self.sensor_config = config.SensorConfig()
if len(self.sensor_config.sensors) == 0:
self.logger.error("No sensors configured!")
self.logger.error("Please configure sensors in config/sensors.json")
sys.exit(1)
# Initialize discovery manager
self.logger.info("Initializing discovery manager...")
self.discovery_manager = DiscoveryManager()
# Initialize MQTT publisher
self.logger.info("Initializing MQTT publisher...")
self.mqtt_publisher = MQTTPublisher()
self.mqtt_publisher.connect()
# Wait a moment for MQTT connection
time.sleep(1)
if not self.mqtt_publisher.is_connected():
self.logger.warning("MQTT connection not established yet, continuing anyway...")
# Initialize sensor reader
self.logger.info("Initializing Bluetooth sensor reader...")
self.sensor_reader = SensorReader(
sensor_config=self.sensor_config,
discovery_manager=self.discovery_manager,
on_measurement=self._on_measurement,
interface=0 # hci0
)
# Start config reload timer
self.config_reload_timer = threading.Timer(
config.CONFIG_RELOAD_INTERVAL,
self._reload_config
)
self.config_reload_timer.daemon = True
self.config_reload_timer.start()
self.logger.info(f"Config reload scheduled every {config.CONFIG_RELOAD_INTERVAL}s")
# Start reading sensors (blocking call)
self.logger.info("=" * 50)
self.logger.info("Sensorpajen is now running")
self.logger.info("Monitoring sensors via Bluetooth...")
self.logger.info("Publishing to MQTT...")
self.logger.info("Press Ctrl+C to stop")
self.logger.info("=" * 50)
self.running = True
self.sensor_reader.start()
except FileNotFoundError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except RuntimeError as e:
self.logger.error(f"Configuration error: {e}")
sys.exit(1)
except Exception as e:
self.logger.error(f"Failed to start application: {e}", exc_info=True)
self.shutdown()
sys.exit(1)
def shutdown(self):
"""Shutdown the application gracefully."""
if not self.running:
return
self.running = False
self.logger.info("Shutting down...")
# Cancel config reload timer
if self.config_reload_timer:
try:
self.config_reload_timer.cancel()
except Exception as e:
self.logger.error(f"Error canceling reload timer: {e}")
# Stop sensor reader
if self.sensor_reader:
try:
self.sensor_reader.stop()
except Exception as e:
self.logger.error(f"Error stopping sensor reader: {e}")
# Disconnect MQTT
if self.mqtt_publisher:
try:
self.mqtt_publisher.disconnect()
except Exception as e:
self.logger.error(f"Error disconnecting MQTT: {e}")
self.logger.info("Shutdown complete")
def main():
"""Main entry point."""
app = Sensorpajen()
app.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,131 @@
"""
MQTT Publisher for sensor data.
Handles connection to MQTT broker and publishing of sensor measurements.
"""
import logging
import paho.mqtt.client as mqtt
from typing import Optional
from . import config
logger = logging.getLogger(__name__)
class MQTTPublisher:
"""Manages MQTT connection and publishing of sensor data."""
def __init__(self):
"""Initialize MQTT publisher with configuration."""
self.client: Optional[mqtt.Client] = None
self.connected = False
self._setup_client()
def _setup_client(self):
"""Setup MQTT client with callbacks."""
# Handle both paho-mqtt v1.x and v2.x
try:
# Try v2.x format (with callback_api_version)
self.client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION1,
client_id=config.MQTT_CLIENT_ID
)
except (TypeError, AttributeError):
# Fall back to v1.x format
self.client = mqtt.Client(config.MQTT_CLIENT_ID)
# Set credentials if provided
if config.MQTT_USER and config.MQTT_PASSWORD:
self.client.username_pw_set(config.MQTT_USER, config.MQTT_PASSWORD)
# Setup callbacks
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
logger.info(f"MQTT client configured for {config.MQTT_HOST}:{config.MQTT_PORT}")
def _on_connect(self, client, userdata, flags, rc):
"""Callback for when client connects to broker."""
if rc == 0:
self.connected = True
logger.info(f"Connected to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
else:
self.connected = False
logger.error(f"Failed to connect to MQTT broker. Return code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Callback for when client disconnects from broker."""
self.connected = False
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker. Return code: {rc}")
else:
logger.info("Disconnected from MQTT broker")
def _on_publish(self, client, userdata, mid):
"""Callback for when message is published."""
logger.debug(f"Message published: {mid}")
def connect(self):
"""Connect to MQTT broker."""
try:
logger.info(f"Connecting to MQTT broker at {config.MQTT_HOST}:{config.MQTT_PORT}")
self.client.connect(config.MQTT_HOST, config.MQTT_PORT, keepalive=60)
self.client.loop_start() # Start network loop in background thread
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
def disconnect(self):
"""Disconnect from MQTT broker."""
if self.client:
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
def publish_measurement(self, sensor_name: str, temperature: float,
humidity: int, battery_voltage: float = None,
battery_level: int = None):
"""
Publish sensor measurement to MQTT.
Args:
sensor_name: Name of the sensor
temperature: Temperature in Celsius
humidity: Humidity percentage
battery_voltage: Battery voltage (optional)
battery_level: Battery level percentage (optional)
"""
if not self.connected:
logger.warning("Not connected to MQTT broker, skipping publish")
return
topic_prefix = f"{config.MQTT_TOPIC_PREFIX}/{sensor_name}"
try:
# Publish temperature
self.client.publish(f"{topic_prefix}/temp", f"{temperature:.1f}")
logger.debug(f"{sensor_name}: temp={temperature:.1f}°C")
# Publish humidity
self.client.publish(f"{topic_prefix}/humidity", f"{humidity}")
logger.debug(f"{sensor_name}: humidity={humidity}%")
# Publish battery info if enabled and available
if config.ENABLE_BATTERY:
if battery_voltage is not None:
self.client.publish(f"{topic_prefix}/batteryvoltage", f"{battery_voltage:.3f}")
logger.debug(f"{sensor_name}: battery_voltage={battery_voltage:.3f}V")
if battery_level is not None:
self.client.publish(f"{topic_prefix}/batterylevel", f"{battery_level}")
logger.debug(f"{sensor_name}: battery_level={battery_level}%")
logger.info(f"Published: {sensor_name} - {temperature:.1f}°C, {humidity}%")
except Exception as e:
logger.error(f"Error publishing to MQTT: {e}")
def is_connected(self) -> bool:
"""Check if connected to MQTT broker."""
return self.connected

View File

@@ -0,0 +1,292 @@
"""
Bluetooth sensor reader for Xiaomi Mijia LYWSD03MMC sensors with ATC firmware.
Reads temperature, humidity, and battery data from BLE advertisements.
"""
import logging
import time
import threading
import bluetooth._bluetooth as bluez
from dataclasses import dataclass
from typing import Optional, Callable, Dict
from . import config
from .utils import (enable_le_scan, disable_le_scan,
parse_le_advertising_events, raw_packet_to_str, toggle_device)
logger = logging.getLogger(__name__)
@dataclass
class Measurement:
"""Sensor measurement data."""
temperature: float
humidity: int
voltage: float
battery: int = 0
rssi: int = 0
sensor_name: str = ""
timestamp: int = 0
class SensorReader:
"""Reads Xiaomi LYWSD03MMC sensors with ATC firmware via BLE."""
def __init__(self, sensor_config: config.SensorConfig,
discovery_manager,
on_measurement: Callable[[Measurement], None],
interface: int = 0):
"""
Initialize sensor reader.
Args:
sensor_config: Sensor configuration mapping
discovery_manager: Discovery manager for tracking new sensors
on_measurement: Callback function for new measurements
interface: Bluetooth interface number (default 0 for hci0)
"""
self.sensor_config = sensor_config
self.discovery_manager = discovery_manager
self.on_measurement = on_measurement
self.interface = interface
self.sock: Optional[int] = None
self.running = False
self.last_ble_packet = time.time()
self.adv_counter: Dict[str, str] = {} # Track advertisement numbers to avoid duplicates
self.watchdog_thread: Optional[threading.Thread] = None
def start(self):
"""Start BLE scanning for sensors."""
try:
logger.info(f"Starting BLE scan on hci{self.interface}")
# Enable bluetooth device
toggle_device(self.interface, True)
# Open bluetooth socket
try:
self.sock = bluez.hci_open_dev(self.interface)
except Exception as e:
logger.error(f"Cannot open bluetooth device hci{self.interface}: {e}")
raise
# Enable LE scanning without filtering duplicates
enable_le_scan(self.sock, filter_duplicates=False)
# Start watchdog if configured
if config.WATCHDOG_TIMEOUT > 0:
self.running = True
self.watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self.watchdog_thread.start()
logger.info(f"Watchdog started with {config.WATCHDOG_TIMEOUT}s timeout")
logger.info("BLE scanning enabled")
logger.info(f"Monitoring {len(self.sensor_config.sensors)} sensors")
# Start parsing advertisements (blocking call)
parse_le_advertising_events(
self.sock,
handler=self._handle_ble_packet,
debug=False
)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
self.stop()
except Exception as e:
logger.error(f"Error in sensor reader: {e}")
self.stop()
raise
def stop(self):
"""Stop BLE scanning."""
self.running = False
if self.sock:
try:
disable_le_scan(self.sock)
logger.info("BLE scanning disabled")
except Exception as e:
logger.error(f"Error disabling BLE scan: {e}")
if self.watchdog_thread and self.watchdog_thread.is_alive():
self.watchdog_thread.join(timeout=2)
def _watchdog_loop(self):
"""Watchdog thread to restart BLE scanning if no packets received."""
restart_counter = 1
while self.running:
time.sleep(1)
now = time.time()
elapsed = now - self.last_ble_packet
if elapsed > config.WATCHDOG_TIMEOUT:
logger.warning(
f"Watchdog: No BLE packet within {int(elapsed)}s. "
f"Restarting BLE scan (count: {restart_counter})"
)
try:
disable_le_scan(self.sock)
time.sleep(1)
enable_le_scan(self.sock, filter_duplicates=False)
restart_counter += 1
self.last_ble_packet = now # Reset timer
except Exception as e:
logger.error(f"Error restarting BLE scan: {e}")
def _handle_ble_packet(self, mac: str, adv_type: int, data: bytes, rssi: int):
"""
Handle incoming BLE advertisement packet.
Args:
mac: MAC address of the device
adv_type: Advertisement type
data: Advertisement data
rssi: Signal strength
"""
# Update last packet time for watchdog
self.last_ble_packet = time.time()
# Convert data to hex string
data_str = raw_packet_to_str(data)
# Check if this is an ATC packet
# ATC format: [... service UUID 0x181A ... MAC ... data ...]
atc_identifier = data_str[6:10].upper()
if atc_identifier != "1A18":
return # Not an ATC packet
# Extract MAC from packet and verify it matches
packet_mac = data_str[10:22].upper()
mac_str = mac.replace(":", "").upper()
if packet_mac != mac_str:
return # MAC mismatch
mac_with_colons = mac.upper()
# Parse ATC data packet first to get sensor data
try:
parsed_data = self._parse_atc_data(data_str)
if not parsed_data:
return
temperature, humidity, battery_percent, battery_voltage, adv_number = parsed_data
# Check if this is a known sensor
if mac_with_colons not in self.sensor_config.sensors:
# Unknown sensor - check if we should discover it
self._handle_unknown_sensor(
mac_with_colons,
rssi,
temperature,
humidity,
battery_percent,
battery_voltage
)
return
# Check advertisement number to avoid duplicates
if mac_str in self.adv_counter:
if self.adv_counter[mac_str] == adv_number:
return # Duplicate packet
self.adv_counter[mac_str] = adv_number
# Create measurement for known sensor
sensor_name = self.sensor_config.get_name(mac_with_colons)
measurement = Measurement(
temperature=temperature,
humidity=humidity,
voltage=battery_voltage / 1000.0,
battery=battery_percent,
rssi=rssi,
sensor_name=sensor_name,
timestamp=int(time.time())
)
# Log the measurement
logger.info(
f"{measurement.sensor_name}: {measurement.temperature}°C, "
f"{measurement.humidity}%, {measurement.voltage}V, "
f"battery {measurement.battery}%, RSSI {rssi}dBm"
)
# Call measurement callback
if self.on_measurement:
self.on_measurement(measurement)
except Exception as e:
logger.error(f"Error parsing ATC packet from {mac}: {e}")
def _handle_unknown_sensor(self, mac: str, rssi: int, temperature: float,
humidity: int, battery_percent: int, battery_voltage: int):
"""
Handle discovery of unknown sensor.
Args:
mac: MAC address with colons
rssi: Signal strength
temperature: Temperature reading
humidity: Humidity reading
battery_percent: Battery percentage
battery_voltage: Battery voltage in mV
"""
# Get or construct device name from MAC
# ATC sensors advertise as ATC_XXXXXX where XXXXXX is last 3 bytes
mac_suffix = mac.replace(":", "")[-6:]
device_name = f"ATC_{mac_suffix}"
# Check if already discovered
if self.discovery_manager.is_known(mac):
# Just update the discovery record
self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
return
# New sensor - discover and notify
is_new = self.discovery_manager.add_or_update(
mac, device_name, rssi, temperature, humidity,
battery_percent, battery_voltage
)
if is_new:
logger.info(f"New sensor discovered: {mac} ({device_name})")
sensor = self.discovery_manager.sensors[mac]
self.discovery_manager.send_ntfy_notification(sensor)
def _parse_atc_data(self, data_str: str) -> Optional[tuple]:
"""
Parse ATC advertisement data.
Returns:
Tuple of (temperature, humidity, battery_percent, battery_voltage, adv_number) or None
"""
try:
# Temperature: bytes 22-26, signed int16, big endian, /10
temp_hex = data_str[22:26]
temp_raw = int(temp_hex, 16)
if temp_raw & 0x8000: # Check sign bit
temp_raw = temp_raw - 0x10000
temperature = temp_raw / 10.0
# Humidity: bytes 26-28, uint8
humidity = int(data_str[26:28], 16)
# Battery: bytes 28-30, uint8
battery_percent = int(data_str[28:30], 16)
# Battery voltage: bytes 30-34, uint16, big endian, mV
battery_voltage = int(data_str[30:34], 16)
# Advertisement number: last 2 bytes
adv_number = data_str[-2:]
return (temperature, humidity, battery_percent, battery_voltage, adv_number)
except (ValueError, IndexError) as e:
logger.debug(f"Error parsing ATC data: {e}")
return None

View File

@@ -0,0 +1,421 @@
# -*- coding: utf-8 -*-
# This file is from https://github.com/colin-guyon/py-bluetooth-utils
# published under MIT License
# MIT License
# Copyright (c) 2020 Colin GUYON
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Module containing some bluetooth utility functions (linux only).
It either uses HCI commands using PyBluez, or does ioctl calls like it's
done in Bluez tools such as hciconfig.
Main functions:
- toggle_device : enable or disable a bluetooth device
- set_scan : set scan type on a device ("noscan", "iscan", "pscan", "piscan")
- enable/disable_le_scan : enable BLE scanning
- parse_le_advertising_events : parse and read BLE advertisements packets
- start/stop_le_advertising : advertise custom data using BLE
Bluez : http://www.bluez.org/
PyBluez : http://karulis.github.io/pybluez/
The module was in particular inspired from 'iBeacon-Scanner-'
https://github.com/switchdoclabs/iBeacon-Scanner-/blob/master/blescan.py
and sometimes directly from the Bluez sources.
"""
from __future__ import absolute_import
import sys
import struct
import fcntl
import array
import socket
from errno import EALREADY
# import PyBluez
import bluetooth._bluetooth as bluez
__all__ = ('toggle_device', 'set_scan',
'enable_le_scan', 'disable_le_scan', 'parse_le_advertising_events',
'start_le_advertising', 'stop_le_advertising',
'raw_packet_to_str')
LE_META_EVENT = 0x3E
LE_PUBLIC_ADDRESS = 0x00
LE_RANDOM_ADDRESS = 0x01
OGF_LE_CTL = 0x08
OCF_LE_SET_SCAN_PARAMETERS = 0x000B
OCF_LE_SET_SCAN_ENABLE = 0x000C
OCF_LE_CREATE_CONN = 0x000D
OCF_LE_SET_ADVERTISING_PARAMETERS = 0x0006
OCF_LE_SET_ADVERTISE_ENABLE = 0x000A
OCF_LE_SET_ADVERTISING_DATA = 0x0008
SCAN_TYPE_PASSIVE = 0x00
SCAN_FILTER_DUPLICATES = 0x01
SCAN_DISABLE = 0x00
SCAN_ENABLE = 0x01
# sub-events of LE_META_EVENT
EVT_LE_CONN_COMPLETE = 0x01
EVT_LE_ADVERTISING_REPORT = 0x02
EVT_LE_CONN_UPDATE_COMPLETE = 0x03
EVT_LE_READ_REMOTE_USED_FEATURES_COMPLETE = 0x04
# Advertisement event types
ADV_IND = 0x00
ADV_DIRECT_IND = 0x01
ADV_SCAN_IND = 0x02
ADV_NONCONN_IND = 0x03
ADV_SCAN_RSP = 0x04
# Allow Scan Request from Any, Connect Request from Any
FILTER_POLICY_NO_WHITELIST = 0x00
# Allow Scan Request from White List Only, Connect Request from Any
FILTER_POLICY_SCAN_WHITELIST = 0x01
# Allow Scan Request from Any, Connect Request from White List Only
FILTER_POLICY_CONN_WHITELIST = 0x02
# Allow Scan Request from White List Only, Connect Request from White List Only
FILTER_POLICY_SCAN_AND_CONN_WHITELIST = 0x03
def toggle_device(dev_id, enable):
"""
Power ON or OFF a bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param enable: Whether to enable of disable the device.
:type enable: ``bool``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
print("Power %s bluetooth device %d" % ('ON' if enable else 'OFF', dev_id))
# di = struct.pack("HbBIBBIIIHHHH10I", dev_id, *((0,) * 22))
# fcntl.ioctl(hci_sock.fileno(), bluez.HCIGETDEVINFO, di)
req_str = struct.pack("H", dev_id)
request = array.array("b", req_str)
try:
fcntl.ioctl(hci_sock.fileno(),
bluez.HCIDEVUP if enable else bluez.HCIDEVDOWN,
request[0])
except IOError as e:
if e.errno == EALREADY:
print("Bluetooth device %d is already %s" % (
dev_id, 'enabled' if enable else 'disabled'))
else:
raise
finally:
hci_sock.close()
# Types of bluetooth scan
SCAN_DISABLED = 0x00
SCAN_INQUIRY = 0x01
SCAN_PAGE = 0x02
def set_scan(dev_id, scan_type):
"""
Set scan type on a given bluetooth device.
:param dev_id: Device id.
:type dev_id: ``int``
:param scan_type: One of
``'noscan'``
``'iscan'``
``'pscan'``
``'piscan'``
:type scan_type: ``str``
"""
hci_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW,
socket.BTPROTO_HCI)
if scan_type == "noscan":
dev_opt = SCAN_DISABLED
elif scan_type == "iscan":
dev_opt = SCAN_INQUIRY
elif scan_type == "pscan":
dev_opt = SCAN_PAGE
elif scan_type == "piscan":
dev_opt = SCAN_PAGE | SCAN_INQUIRY
else:
raise ValueError("Unknown scan type %r" % scan_type)
req_str = struct.pack("HI", dev_id, dev_opt)
print("Set scan type %r to bluetooth device %d" % (scan_type, dev_id))
try:
fcntl.ioctl(hci_sock.fileno(), bluez.HCISETSCAN, req_str)
finally:
hci_sock.close()
def raw_packet_to_str(pkt):
"""
Returns the string representation of a raw HCI packet.
"""
if sys.version_info > (3, 0):
return ''.join('%02x' % struct.unpack("B", bytes([x]))[0] for x in pkt)
else:
return ''.join('%02x' % struct.unpack("B", x)[0] for x in pkt)
def enable_le_scan(sock, interval=0x0800, window=0x0800,
filter_policy=FILTER_POLICY_NO_WHITELIST,
filter_duplicates=True):
"""
Enable LE passive scan (with filtering of duplicate packets enabled).
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param interval: Scan interval.
:param window: Scan window (must be less or equal than given interval).
:param filter_policy: One of
``FILTER_POLICY_NO_WHITELIST`` (default value)
``FILTER_POLICY_SCAN_WHITELIST``
``FILTER_POLICY_CONN_WHITELIST``
``FILTER_POLICY_SCAN_AND_CONN_WHITELIST``
.. note:: Scan interval and window are to multiply by 0.625 ms to
get the real time duration.
"""
print("Enable LE scan")
own_bdaddr_type = LE_PUBLIC_ADDRESS # does not work with LE_RANDOM_ADDRESS
cmd_pkt = struct.pack("<BHHBB", SCAN_TYPE_PASSIVE, interval, window,
own_bdaddr_type, filter_policy)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_PARAMETERS, cmd_pkt)
print("scan params: interval=%.3fms window=%.3fms own_bdaddr=%s "
"whitelist=%s" %
(interval * 0.625, window * 0.625,
'public' if own_bdaddr_type == LE_PUBLIC_ADDRESS else 'random',
'yes' if filter_policy in (FILTER_POLICY_SCAN_WHITELIST,
FILTER_POLICY_SCAN_AND_CONN_WHITELIST)
else 'no'))
cmd_pkt = struct.pack("<BB", SCAN_ENABLE, SCAN_FILTER_DUPLICATES if filter_duplicates else 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def disable_le_scan(sock):
"""
Disable LE scan.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
print("Disable LE scan")
cmd_pkt = struct.pack("<BB", SCAN_DISABLE, 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_SCAN_ENABLE, cmd_pkt)
def start_le_advertising(sock, min_interval=1000, max_interval=1000,
adv_type=ADV_NONCONN_IND, data=()):
"""
Start LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param min_interval: Minimum advertising interval.
:param max_interval: Maximum advertising interval.
:param adv_type: Advertisement type (``ADV_NONCONN_IND`` by default).
:param data: The advertisement data (maximum of 31 bytes).
:type data: iterable
"""
own_bdaddr_type = 0
direct_bdaddr_type = 0
direct_bdaddr = (0,) * 6
chan_map = 0x07 # All channels: 37, 38, 39
filter = 0
struct_params = [min_interval, max_interval, adv_type, own_bdaddr_type,
direct_bdaddr_type]
struct_params.extend(direct_bdaddr)
struct_params.extend((chan_map, filter))
cmd_pkt = struct.pack("<HHBBB6BBB", *struct_params)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_PARAMETERS,
cmd_pkt)
cmd_pkt = struct.pack("<B", 0x01)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
data_length = len(data)
if data_length > 31:
raise ValueError("data is too long (%d but max is 31 bytes)",
data_length)
cmd_pkt = struct.pack("<B%dB" % data_length, data_length, *data)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISING_DATA, cmd_pkt)
print("Advertising started data_length=%d data=%r" % (data_length, data))
def stop_le_advertising(sock):
"""
Stop LE advertising.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
"""
cmd_pkt = struct.pack("<B", 0x00)
bluez.hci_send_cmd(sock, OGF_LE_CTL, OCF_LE_SET_ADVERTISE_ENABLE, cmd_pkt)
print("Advertising stopped")
def parse_le_advertising_events(sock, mac_addr=None, packet_length=None,
handler=None, debug=False):
"""
Parse and report LE advertisements.
This is a blocking call, an infinite loop is started and the
given handler will be called each time a new LE advertisement packet
is detected and corresponds to the given filters.
.. note:: The :func:`.start_le_advertising` function must be
called before calling this function.
:param sock: A bluetooth HCI socket (retrieved using the
``hci_open_dev`` PyBluez function).
:param mac_addr: list of filtered mac address representations
(uppercase, with ':' separators).
If not specified, the LE advertisement of any device will be reported.
Example: mac_addr=('00:2A:5F:FF:25:11', 'DA:FF:12:33:66:12')
:type mac_addr: ``list`` of ``string``
:param packet_length: Filter a specific length of LE advertisement packet.
:type packet_length: ``int``
:param handler: Handler that will be called each time a LE advertisement
packet is available (in accordance with the ``mac_addr``
and ``packet_length`` filters).
:type handler: ``callable`` taking 4 parameters:
mac (``str``), adv_type (``int``), data (``bytes``) and rssi (``int``)
:param debug: Enable debug prints.
:type debug: ``bool``
"""
if not debug and handler is None:
raise ValueError("You must either enable debug or give a handler !")
old_filter = sock.getsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, 14)
flt = bluez.hci_filter_new()
bluez.hci_filter_set_ptype(flt, bluez.HCI_EVENT_PKT)
# bluez.hci_filter_all_events(flt)
bluez.hci_filter_set_event(flt, LE_META_EVENT)
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, flt)
print("socket filter set to ptype=HCI_EVENT_PKT event=LE_META_EVENT")
print("Listening ...")
try:
while True:
pkt = full_pkt = sock.recv(255)
ptype, event, plen = struct.unpack("BBB", pkt[:3])
if event != LE_META_EVENT:
# Should never occur because we filtered with this type of event
print("Not a LE_META_EVENT !")
continue
sub_event, = struct.unpack("B", pkt[3:4])
if sub_event != EVT_LE_ADVERTISING_REPORT:
if debug:
print("Not a EVT_LE_ADVERTISING_REPORT !")
continue
pkt = pkt[4:]
adv_type = struct.unpack("b", pkt[1:2])[0]
mac_addr_str = bluez.ba2str(pkt[3:9])
if packet_length and plen != packet_length:
# ignore this packet
if debug:
print("packet with non-matching length: mac=%s adv_type=%02x plen=%s" %
(mac_addr_str, adv_type, plen))
print(raw_packet_to_str(pkt))
continue
data = pkt[9:-1]
rssi = struct.unpack("b", full_pkt[len(full_pkt)-1:len(full_pkt)])[0]
if mac_addr and mac_addr_str not in mac_addr:
if debug:
print("packet with non-matching mac %s adv_type=%02x data=%s RSSI=%s" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
continue
if debug:
print("LE advertisement: mac=%s adv_type=%02x data=%s RSSI=%d" %
(mac_addr_str, adv_type, raw_packet_to_str(data), rssi))
if handler is not None:
try:
handler(mac_addr_str, adv_type, data, rssi)
except Exception as e:
print('Exception when calling handler with a BLE advertising event: %r' % (e,))
import traceback
traceback.print_exc()
except KeyboardInterrupt:
print("\nRestore previous socket filter")
sock.setsockopt(bluez.SOL_HCI, bluez.HCI_FILTER, old_filter)
raise
"""
def hci_le_add_white_list(int dd, const bdaddr_t *bdaddr, uint8_t type, int to)
{
struct hci_request {
uint16_t ogf;
uint16_t ocf;
int event;
void *cparam;
int clen;
void *rparam;
int rlen;
};
struct hci_request rq;
le_add_device_to_white_list_cp cp;
uint8_t status;
memset(&cp, 0, sizeof(cp));
cp.bdaddr_type = type;
bacpy(&cp.bdaddr, bdaddr);
memset(&rq, 0, sizeof(rq));
rq.ogf = OGF_LE_CTL;
rq.ocf = OCF_LE_ADD_DEVICE_TO_WHITE_LIST;
rq.cparam = &cp;
rq.clen = LE_ADD_DEVICE_TO_WHITE_LIST_CP_SIZE;
rq.rparam = &status;
rq.rlen = 1;
if (hci_send_req(dd, &rq, to) < 0)
return -1;
if (status) {
errno = EIO;
return -1;
}
return 0;
}"""

View File

@@ -0,0 +1,32 @@
[Unit]
Description=Sensorpajen - Bluetooth Temperature Sensor Monitor
Documentation=https://github.com/yourusername/sensorpajen
After=network.target bluetooth.target
Wants=bluetooth.target
[Service]
Type=simple
User=sensorpajen
Group=sensorpajen
WorkingDirectory=/opt/sensorpajen
EnvironmentFile=/etc/sensorpajen/sensorpajen.env
ExecStart=/opt/sensorpajen/venv/bin/python -m sensorpajen.main
Restart=always
RestartSec=10
# Bluetooth capabilities require this to be false
NoNewPrivileges=false
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=sensorpajen
# Security hardening (where possible with Bluetooth requirements)
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/etc/sensorpajen
[Install]
WantedBy=multi-user.target

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,32 @@
[
{
"mac": "A4:C1:38:12:34:56",
"name": "ATC_123456",
"rssi": -65,
"first_seen": "2025-12-27T10:30:15",
"last_seen": "2025-12-27T10:35:42",
"sample_reading": {
"temperature": 21.5,
"humidity": 45,
"battery_percent": 87,
"battery_voltage": 2950
},
"status": "pending"
},
{
"mac": "A4:C1:38:AB:CD:EF",
"name": "ATC_ABCDEF",
"rssi": -72,
"first_seen": "2025-12-27T11:00:00",
"last_seen": "2025-12-27T11:10:00",
"sample_reading": {
"temperature": 19.8,
"humidity": 52,
"battery_percent": 65,
"battery_voltage": 2800
},
"status": "ignored",
"ignored_at": "2025-12-27T11:15:00",
"ignore_reason": "Test sensor, not needed"
}
]

View File

@@ -0,0 +1,22 @@
# MQTT Configuration
MQTT_HOST=192.168.0.114
MQTT_PORT=1883
MQTT_USER=hasse
MQTT_PASSWORD=casablanca
MQTT_CLIENT_ID=mibridge
# Sensor Configuration (relative to project root)
SENSOR_CONFIG_FILE=config/sensors.json
DISCOVERED_SENSORS_FILE=config/discovered_sensors.json
# Application Settings
WATCHDOG_TIMEOUT=5
ENABLE_BATTERY=true
LOG_LEVEL=INFO
CONFIG_RELOAD_INTERVAL=900 # 15 minutes in seconds
# ntfy Notifications (optional)
NTFY_ENABLED=false
NTFY_URL=https://ntfy.sh
NTFY_TOPIC=sensorpajen
NTFY_TOKEN=

View File

@@ -0,0 +1,37 @@
{
"sensors": [
{
"mac": "A4:C1:38:98:7B:B6",
"name": "mi_temp_1",
"comment": "Example sensor - replace with your sensors"
},
{
"mac": "A4:C1:38:29:03:0D",
"name": "mi_temp_2"
},
{
"mac": "A4:C1:38:62:CA:83",
"name": "mi_temp_3"
},
{
"mac": "A4:C1:38:D5:EA:63",
"name": "mi_temp_4"
},
{
"mac": "A4:C1:38:7C:9C:63",
"name": "mi_temp_5"
},
{
"mac": "A4:C1:38:68:2C:DA",
"name": "mi_temp_6"
},
{
"mac": "A4:C1:38:AD:74:2B",
"name": "mi_temp_7"
},
{
"mac": "A4:C1:38:46:9F:D1",
"name": "mi_temp_8"
}
]
}

Binary file not shown.

View File

@@ -60,12 +60,16 @@ REQUIRED_FILES=(
"debian/postinst"
"debian/prerm"
"debian/postrm"
"debian/compat"
"debian/sensorpajen.service"
"src/sensorpajen/main.py"
"pyproject.toml"
)
# Optional files (debian/compat is now optional - use Build-Depends instead)
OPTIONAL_FILES=(
"debian/compat"
)
ALL_FILES_OK=1
for file in "${REQUIRED_FILES[@]}"; do
echo -n " $file... "
@@ -77,6 +81,16 @@ for file in "${REQUIRED_FILES[@]}"; do
fi
done
# Check optional files
for file in "${OPTIONAL_FILES[@]}"; do
echo -n " $file... "
if [ -f "$file" ]; then
echo -e "${GREEN}OK${NC}"
else
echo -e "${YELLOW}OPTIONAL${NC}"
fi
done
if [ $ALL_FILES_OK -eq 0 ]; then
echo -e "${RED}Some required files are missing!${NC}"
exit 1
@@ -94,10 +108,10 @@ echo "Cleaning previous builds..."
rm -f ../*.deb ../*.build ../*.buildinfo ../*.changes
rm -rf debian/.debhelper debian/sensorpajen debian/files
# Build the package
# Build the package with gzip compression (for compatibility)
echo "Building Debian package..."
echo "======================================================================"
dpkg-buildpackage -us -uc -b
dpkg-buildpackage -us -uc -b -Zgzip
if [ $? -ne 0 ]; then
echo -e "${RED}Build failed!${NC}"