GigaProjects

← Back to meshtastic-bitchat-bridge

mesh-bit.py

import asyncio
import logging
import sys
import time
import struct
import random
import signal
import hashlib
import hashlib
from typing import Optional, List, Dict

# Cryptography
import nacl.signing
import nacl.encoding
import nacl.bindings

# Compression
import lz4.block

# BLE & LoRa
from bleak import BleakScanner, BleakClient
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
import meshtastic
import meshtastic.serial_interface
from pubsub import pub

# --- CONFIGURATION ---
MESHTASTIC_PORT = "/dev/ttyACM0" 

# Bitchat UUIDs
BITCHAT_SERVICE_UUID = "f47b5e2d-4a9e-4c5a-9b3f-8e1d2c3a4b5c"
BITCHAT_RX_CHAR_UUID = "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d" 
BITCHAT_TX_CHAR_UUID = "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d"

BRIDGE_TAG = "Bridge"

# Protocol Constants (Matches BinaryProtocol.kt)
PACKET_VERSION = 0x01
PACKET_TYPE_ANNOUNCE = 0x01
PACKET_TYPE_MESSAGE = 0x02
PACKET_TTL = 0x07
FLAG_HAS_RECIPIENT = 0x01
FLAG_HAS_SIGNATURE = 0x02
FLAG_IS_COMPRESSED = 0x04
CANONICAL_TTL_FOR_SIGNING = 0x00  # Fixed per Kotlin comment for relay compatibility

# V1 Header: Ver(1)+Type(1)+TTL(1)+Time(8)+Flags(1)+Len(2) = 14 bytes?
# Kotlin says HEADER_SIZE_V1 = 13. Let's re-count.
# Ver(1) + Type(1) + TTL(1) + Time(8) + Flags(1) + Len(2) = 14.
# Wait, Kotlin code: 
# buffer.put(version), buffer.put(type), buffer.put(ttl) -> 3 bytes
# buffer.putLong(timestamp) -> 8 bytes (Total 11)
# buffer.put(flags) -> 1 byte (Total 12)
# buffer.putShort(len) -> 2 bytes (Total 14)
# There is a discrepancy in the Kotlin comment vs code.
# Kotlin constant says HEADER_SIZE_V1 = 13.
# BUT the code writes 14 bytes. 
# Let's stick to the calculated 14 bytes as per the actual `buffer.put` calls.
HEADER_SIZE = 14 

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(BRIDGE_TAG)

class MeshtasticHandler:
    def __init__(self, port: str, loop: asyncio.AbstractEventLoop):
        self.port = port
        self.interface = None
        self.loop = loop
        self.ble_handler: Optional['BitchatBLEHandler'] = None

    def set_ble_handler(self, handler):
        self.ble_handler = handler

    def start(self):
        try:
            logger.info(f"Attempting connection to Meshtastic on {self.port}...")
            self.interface = meshtastic.serial_interface.SerialInterface(self.port)
            pub.subscribe(self.on_receive, "meshtastic.receive")
            logger.info("Meshtastic interface ready.")
        except Exception as e:
            logger.error(f"Failed to connect to Meshtastic: {e}")
            logger.info("Trying alternative port /dev/ttyUSB0...")
            try:
                self.interface = meshtastic.serial_interface.SerialInterface("/dev/ttyUSB0")
                pub.subscribe(self.on_receive, "meshtastic.receive")
                logger.info("Meshtastic interface ready on /dev/ttyUSB0.")
            except Exception as e2:
                logger.error(f"Failed secondary connection: {e2}")
                logger.info("Continuing in Bluetooth-only mode.")

    def get_sender_name(self, from_id: str) -> str:
        if self.interface and self.interface.nodes:
            node = self.interface.nodes.get(from_id)
            if node:
                user = node.get('user')
                if user:
                    return user.get('longName', from_id)
        return from_id

    def on_receive(self, packet, interface):
        try:
            if 'decoded' in packet and 'text' in packet['decoded']:
                text = packet['decoded']['text']
                sender_id = packet['fromId']
                
                if text.startswith("[Bit:"): 
                    return

                sender_name = self.get_sender_name(sender_id)
                logger.info(f"(LoRa -> Bridge) {sender_name}: {text}")

                formatted_msg = f"[Mesh: {sender_name}] {text}"

                if self.ble_handler:
                    asyncio.run_coroutine_threadsafe(
                        self.ble_handler.broadcast(formatted_msg),
                        self.loop
                    )
        except Exception as e:
            logger.error(f"Error processing LoRa packet: {e}")

    def send_text(self, text: str):
        if self.interface:
            try:
                logger.info(f"[Meshtastic] Sending packet: {text}")
                self.interface.sendText(text)
            except Exception as e:
                logger.error(f"[Meshtastic] Send failed: {e}")

class BitchatBLEHandler:
    def __init__(self, loop: asyncio.AbstractEventLoop):
        self.connected_clients: Dict[str, BleakClient] = {}
        self.connecting_devices = set()
        self.meshtastic_handler: Optional[MeshtasticHandler] = None
        self.loop = loop
        self._stopping = False
        self.peer_nicknames = {}  # Store mapping of sender_id (hex) -> nickname
        self.failed_devices = {}  # Track devices that failed connection (address -> timestamp)
        
        # --- IDENTITY SETUP ---
        # Use random key for fresh identity on every run to avoid stale peer state on phone
        self.signing_key = nacl.signing.SigningKey.generate() 
        self.verify_key = self.signing_key.verify_key
        self.public_key_bytes = self.verify_key.encode(encoder=nacl.encoding.RawEncoder)
        # self.my_id is now derived from X25519 key below
        
        self.x25519_private = nacl.bindings.crypto_box_keypair()[1]
        self.x25519_public = nacl.bindings.crypto_scalarmult_base(self.x25519_private)
        
        # Use SHA256 hash of Ed25519 (Signing) Public Key for SenderID
        # This matches the app's behavior where Identity is tied to the Signing Key.
        self.my_id = hashlib.sha256(self.public_key_bytes).digest()[:8]
        
        logger.info(f"Bridge Identity: {self.my_id.hex()}")
        logger.info(f"Full public key: {self.public_key_bytes.hex()}")  # Add this for debugging

    def set_meshtastic_handler(self, handler):
        self.meshtastic_handler = handler

    async def run_scanner(self):
        """Continuously scan for Bitchat devices (Polling Mode)"""
        logger.info("Scanning for Bitchat devices...")
        
        while not self._stopping:
            try:
                # Scan for 3 seconds
                devices = await BleakScanner.discover(timeout=3.0, return_adv=True)
                
                for device, adv in devices.values():
                    if BITCHAT_SERVICE_UUID.lower() in adv.service_uuids:
                        # Check blacklist (cool-down for 5 minutes)
                        if device.address in self.failed_devices:
                            if time.time() - self.failed_devices[device.address] < 300:
                                continue
                            else:
                                del self.failed_devices[device.address] # Expired

                        # Only connect if not already connected or connecting
                        if (device.address not in self.connected_clients and 
                            device.address not in self.connecting_devices):
                            logger.info(f"Found peer: {device.address}")
                            self.connecting_devices.add(device.address)
                            asyncio.create_task(self.connect_client(device))
                            
            except Exception as e:
                logger.warning(f"Scanner error: {e}")
                await asyncio.sleep(1.0)
            
            await asyncio.sleep(0.5)

    def _pad_data(self, data: bytearray) -> bytearray:
        """PKCS7-style padding to optimal block size (matches MessagePadding.kt)"""
        # Block sizes from MessagePadding.kt
        block_sizes = [256, 512, 1024, 2048]
        
        # Account for encryption overhead (~16 bytes) - though we aren't encrypting here, 
        # the app expects padding to align with these blocks assuming encryption overhead.
        # But wait, BinaryProtocol.kt calls pad(result, optimalSize) on the PLAINTEXT packet bytes.
        # So we should pad the packet bytes to these sizes.
        
        target_size = len(data)
        for size in block_sizes:
            if len(data) + 16 <= size: # +16 is from optimalBlockSize logic in Kotlin
                target_size = size
                break
        
        if len(data) >= target_size:
            return data
            
        padding_needed = target_size - len(data)
        
        # Constrain to 255 (byte limit)
        if padding_needed > 255:
            # If we can't pad to the target block size with < 255 bytes, 
            # we just don't pad (or pad to a smaller alignment if we were strictly following PKCS7 block alignment, 
            # but here we are padding to specific privacy block sizes).
            # The Kotlin code says: "if (paddingNeeded > 255) return data"
            return data
            
        # PKCS#7: All pad bytes are equal to the pad length
        padding = bytes([padding_needed] * padding_needed)
        padded = bytearray(data)
        padded.extend(padding)
        return padded

    def _build_packet(self, type_byte, payload, recipient_id=None):
        """Builds a signed packet compliant with BinaryProtocol.kt
        
        CRITICAL: The signature must be computed over the PADDED unsigned packet,
        because toBinaryDataForSigning() on Android calls BinaryProtocol.encode()
        which applies padding BEFORE returning the bytes to verify.
        """
        
        # 1. Build Header (14 Bytes)
        header = bytearray()
        header.append(PACKET_VERSION)
        header.append(type_byte)
        header.append(PACKET_TTL)
        
        timestamp_ms = int(time.time() * 1000)
        header.extend(struct.pack('>Q', timestamp_ms))
        logger.info(f"Timestamp: {timestamp_ms} (Hex: {struct.pack('>Q', timestamp_ms).hex()})")
        
        flags = FLAG_HAS_SIGNATURE
        if recipient_id:
            flags |= FLAG_HAS_RECIPIENT
        header.append(flags)
        
        header.extend(struct.pack('>H', len(payload)))
        
        # 2. Build UNSIGNED packet (for signing)
        # This must match what toBinaryDataForSigning() produces:
        # - TTL = SYNC_TTL_HOPS (0)
        # - Flags without HAS_SIGNATURE
        # - Then PADDED
        signing_header = bytearray(header)
        signing_header[2] = CANONICAL_TTL_FOR_SIGNING  # TTL = 0
        signing_flags = flags & ~FLAG_HAS_SIGNATURE  # Remove HAS_SIGNATURE
        signing_header[11] = signing_flags
        
        unsigned_packet = bytearray()
        unsigned_packet.extend(signing_header)
        unsigned_packet.extend(self.my_id)
        if recipient_id:
            unsigned_packet.extend(recipient_id[:8])
        unsigned_packet.extend(payload)
        
        # CRITICAL: Pad the unsigned packet BEFORE signing
        # This matches BinaryProtocol.encode() behavior
        unsigned_packet_padded = self._pad_data(unsigned_packet)
        
        logger.info(f"Unsigned block (PADDED) hex for signing: {unsigned_packet_padded.hex()}")
        
        # 3. Sign the PADDED unsigned packet
        signature = self.signing_key.sign(bytes(unsigned_packet_padded)).signature
        
        # 4. Assemble Final Packet (with actual TTL and HAS_SIGNATURE flag)
        final = bytearray()
        final.extend(header)
        final.extend(self.my_id)
        if recipient_id:
            final.extend(recipient_id[:8])
        final.extend(payload)
        final.extend(signature)
        
        # 5. Apply Padding to final packet
        final_padded = self._pad_data(final)
        
        return bytes(final_padded)

    async def connect_client(self, device: BLEDevice):
        """Connect to a BLE device with retry logic for transient failures"""
        MAX_RETRIES = 3
        RETRY_DELAY = 2.0  # seconds
        
        for attempt in range(MAX_RETRIES):
            # Use address string instead of device object to force fresh resolution
            # This helps with iOS RPA rotation and avoids stale BlueZ cache
            client = BleakClient(device.address, timeout=10.0)
            try:
                logger.info(f"Connection attempt {attempt + 1}/{MAX_RETRIES} to {device.address}")
                await client.connect()
                if client.is_connected:
                    logger.info(f"✅ Connected to {device.address}")
                    self.connected_clients[device.address] = client
                    
                    # Send Handshake (ANNOUNCE)
                    logger.info("Sending Handshake...")
                    
                    # ANNOUNCE Payload - Restore Tagged Structure
                    handshake_payload = bytearray()
                    name = "MeshBridge"
                    
                    # Tag 1: Name
                    handshake_payload.extend(b'\x01')
                    handshake_payload.append(len(name))
                    handshake_payload.extend(name.encode('utf-8'))
                    
                    # Tag 2: Noise Public Key (X25519) - 0x02
                    handshake_payload.extend(b'\x02')
                    handshake_payload.append(len(self.x25519_public))
                    handshake_payload.extend(self.x25519_public)
                    
                    # Tag 3: Signing Public Key (Ed25519) - 0x03
                    handshake_payload.extend(b'\x03')
                    handshake_payload.append(len(self.public_key_bytes))
                    handshake_payload.extend(self.public_key_bytes)
                    
                    # Send ANNOUNCE with EXPLICIT BROADCAST recipient
                    # This ensures the packet has the HAS_RECIPIENT flag set, which might be required for processing.
                    packet = self._build_packet(PACKET_TYPE_ANNOUNCE, handshake_payload, recipient_id=b'\xff'*8)
                    await client.write_gatt_char(BITCHAT_RX_CHAR_UUID, packet, response=True)
                    
                    # Setup notification handler
                    await client.start_notify(BITCHAT_TX_CHAR_UUID, self._create_notification_handler(device.address))
                    
                    return  # Success!
                    
            except Exception as e:
                error_msg = str(e)
                logger.warning(f"Connection error: {error_msg}")
                
                # Clean up failed connection
                try:
                    if client.is_connected:
                        await client.disconnect()
                except:
                    pass
                
                # Retry logic
                if attempt < MAX_RETRIES - 1:
                    if "InProgress" in error_msg or "br-connection-canceled" in error_msg:
                        # These are transient errors, retry with backoff
                        delay = RETRY_DELAY * (2 ** attempt)  # Exponential backoff
                        logger.info(f"⏳ Transient error, retrying in {delay:.1f}s...")
                        await asyncio.sleep(delay)
                        continue
                    else:
                        # Fatal error, don't retry
                        logger.error(f"❌ Fatal connection error, giving up")
                        break
                else:
                    logger.error(f"❌ Failed to connect after {MAX_RETRIES} attempts")
                    # Add to blacklist to avoid spamming pairing requests
                    self.failed_devices[device.address] = time.time()
                    break
        
        # Clean up if all retries failed
        self.connecting_devices.discard(device.address)
    
    def _on_client_disconnect(self, address: str):
        """Callback when a BLE client disconnects"""
        logger.info(f"Client {address} disconnected")
        if address in self.connected_clients:
            del self.connected_clients[address]
        self.connecting_devices.discard(address)

    def _create_notification_handler(self, address):
        def handler(sender_handle: int, data: bytearray):
            try:
                if len(data) < HEADER_SIZE: return
                
                # Parse Header
                packet_type = data[1]
                flags = data[11]
                payload_len = struct.unpack('>H', data[12:14])[0]
                
                has_recipient = (flags & FLAG_HAS_RECIPIENT) != 0
                is_compressed = (flags & FLAG_IS_COMPRESSED) != 0
                
                offset = HEADER_SIZE
                
                # Sender ID
                sender_id = data[offset : offset+8]
                offset += 8
                short_id = sender_id.hex()[-4:]
                

                if has_recipient:
                    recipient_id = data[offset : offset+8]  # Log the recipient before skipping
                    logger.info(f"Recipient ID: {recipient_id.hex()}")
                    if all(b == 0xff for b in recipient_id):
                        logger.info("Incoming is broadcast message")
                    else:
                        logger.info("Incoming is private message")
                    offset += 8
                    
                # Extract Payload
                raw_payload = data[offset : offset + payload_len]
                
                # Decompression
                final_text = ""
                if is_compressed:
                    try:
                        # Skip original size (2 bytes) for lz4 block decompression
                        compressed_data = bytes(raw_payload[2:]) 
                        uncompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=65536)
                        final_text = uncompressed_data.decode('utf-8', errors='ignore')
                    except Exception as e:
                        logger.error(f"Decompression failed: {e}")
                        return 
                else:
                    final_text = raw_payload.decode('utf-8', errors='ignore')

                if packet_type == PACKET_TYPE_MESSAGE:
                    sender_hex = sender_id.hex()
                    display_name = self.peer_nicknames.get(sender_hex, short_id)
                    
                    logger.info(f"(BLE -> Bridge) {display_name}: {final_text}")
                    
                    if self.meshtastic_handler:
                        self.meshtastic_handler.send_text(f"[Bit:{display_name}] {final_text}")
                
                elif packet_type == PACKET_TYPE_ANNOUNCE:
                    try:
                        # Parse TLV (Type-Length-Value)
                        tlv_offset = 0
                        while tlv_offset < len(raw_payload):
                            if tlv_offset + 2 > len(raw_payload): break
                            tag = raw_payload[tlv_offset]
                            length = raw_payload[tlv_offset + 1]
                            tlv_offset += 2
                            if tlv_offset + length > len(raw_payload): break
                            value = raw_payload[tlv_offset:tlv_offset + length]
                            tlv_offset += length
                            
                            if tag == 0x01: # Nickname Tag
                                nickname = value.decode('utf-8', errors='ignore')
                                sender_hex = sender_id.hex()
                                if self.peer_nicknames.get(sender_hex) != nickname:
                                    self.peer_nicknames[sender_hex] = nickname
                                    logger.info(f"Peer {short_id} is now known as '{nickname}'")
                                break
                    except Exception as e:
                        logger.warning(f"Failed to parse ANNOUNCE: {e}")

            except Exception as e:
                logger.error(f"Packet processing error: {e}")
        return handler


    async def broadcast(self, message: str, recipient_id: Optional[bytes] = None):
        if self._stopping: return
        try:
            payload = message.encode('utf-8')
            packet = self._build_packet(PACKET_TYPE_MESSAGE, payload, recipient_id)
            
            logger.info(f"Echo packet hex (padded): {packet.hex()}")  # Add this for debugging

            current_clients = list(self.connected_clients.values())
            for client in current_clients:
                if client.is_connected:
                    await client.write_gatt_char(BITCHAT_RX_CHAR_UUID, packet, response=True)
                    logger.info(f"Echo sent to {client.address}")
        except Exception as e:
            logger.error(f"Broadcast failed: {e}")



    async def stop(self):
        self._stopping = True
        logger.info("Disconnecting clients...")
        for client in list(self.connected_clients.values()):
            try:
                await client.disconnect()
            except Exception:
                pass
        self.connected_clients.clear()

async def main():
    loop = asyncio.get_running_loop()
    stop_event = asyncio.Event()

    def handle_sigint():
        stop_event.set()

    loop.add_signal_handler(signal.SIGINT, handle_sigint)
    
    ble_handler = BitchatBLEHandler(loop)
    meshtastic_handler = MeshtasticHandler(MESHTASTIC_PORT, loop)
    
    ble_handler.set_meshtastic_handler(meshtastic_handler)
    meshtastic_handler.set_ble_handler(ble_handler)
    
    meshtastic_handler.start()
    scanner_task = asyncio.create_task(ble_handler.run_scanner())
    
    try:
        await stop_event.wait()
    finally:
        logger.info("Shutting down...")
        await ble_handler.stop()
        scanner_task.cancel()
        try:
            await scanner_task
        except asyncio.CancelledError:
            pass
        logger.info("Shutdown complete.")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Run this code