#!/usr/bin/env python3
"""
Bitchat Terminal Chat Client
Polling Scanner + Correct Protocol Logic
"""
import asyncio
import logging
import sys
import time
import struct
import hashlib
import signal
from typing import Dict, Optional, Set
# Cryptography
import nacl.signing
import nacl.encoding
import nacl.bindings
# Compression
try:
import lz4.block
except ImportError:
lz4 = None
# BLE
from bleak import BleakScanner, BleakClient
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
# --- CONFIGURATION ---
BITCHAT_SERVICE_UUID = "f47b5e2d-4a9e-4c5a-9b3f-8e1d2c3a4b5c"
BITCHAT_RX_CHAR_UUID = "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d"
BITCHAT_TX_CHAR_UUID = "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d"
# Protocol Constants
PACKET_VERSION = 0x01
PACKET_TYPE_ANNOUNCE = 0x01
PACKET_TYPE_MESSAGE = 0x02
PACKET_TTL = 0x07
FLAG_HAS_RECIPIENT = 0x01
FLAG_HAS_SIGNATURE = 0x02
FLAG_IS_COMPRESSED = 0x04
CANONICAL_TTL_FOR_SIGNING = 0x00
HEADER_SIZE = 14
# Logging setup
logging.basicConfig(level=logging.ERROR, format="%(message)s")
logger = logging.getLogger("BitchatChat")
class BitchatBLEHandler:
def __init__(self, loop: asyncio.AbstractEventLoop, nickname: str = "Anonymous"):
self.connected_clients: Dict[str, BleakClient] = {}
self.connecting_devices = set()
self.seen_devices: Set[str] = set()
self.peer_nicknames: Dict[str, str] = {}
self.loop = loop
self._stopping = False
self.nickname = nickname
self.scanner_task = None
# --- IDENTITY SETUP ---
self.signing_key = nacl.signing.SigningKey.generate()
self.verify_key = self.signing_key.verify_key
self.public_key_bytes = self.verify_key.encode(encoder=nacl.encoding.RawEncoder)
self.x25519_private = nacl.bindings.crypto_box_keypair()[1]
self.x25519_public = nacl.bindings.crypto_scalarmult_base(self.x25519_private)
self.my_id = hashlib.sha256(self.public_key_bytes).digest()[:8]
print(f"\n╔══════════════════════════════════════════════════════════════╗")
print(f"║ Bitchat Terminal Chat - Connected as: {self.nickname:<22} ║")
print(f"║ Your ID: {self.my_id.hex():<46} ║")
print(f"╚══════════════════════════════════════════════════════════════╝\n")
print("Scanning for peers...")
def _pad_data(self, data: bytearray) -> bytearray:
"""PKCS7-style padding"""
block_sizes = [256, 512, 1024, 2048]
target_size = len(data)
for size in block_sizes:
if len(data) + 16 <= size:
target_size = size
break
if len(data) >= target_size: return data
padding_needed = target_size - len(data)
if padding_needed > 255: return data
padding = bytes([padding_needed] * padding_needed)
padded = bytearray(data)
padded.extend(padding)
return padded
def _build_packet(self, type_byte, payload, recipient_id=None):
"""Builds a signed packet"""
header = bytearray()
header.append(PACKET_VERSION)
header.append(type_byte)
header.append(PACKET_TTL)
timestamp_ms = int(time.time() * 1000)
header.extend(struct.pack('>Q', timestamp_ms))
flags = FLAG_HAS_SIGNATURE
if recipient_id:
flags |= FLAG_HAS_RECIPIENT
header.append(flags)
header.extend(struct.pack('>H', len(payload)))
# Build UNSIGNED packet
signing_header = bytearray(header)
signing_header[2] = CANONICAL_TTL_FOR_SIGNING
signing_flags = flags & ~FLAG_HAS_SIGNATURE
signing_header[11] = signing_flags
unsigned_packet = bytearray()
unsigned_packet.extend(signing_header)
unsigned_packet.extend(self.my_id)
if recipient_id:
unsigned_packet.extend(recipient_id[:8])
unsigned_packet.extend(payload)
# Pad and sign
unsigned_packet_padded = self._pad_data(unsigned_packet)
signature = self.signing_key.sign(bytes(unsigned_packet_padded)).signature
# Assemble Final Packet
final = bytearray()
final.extend(header)
final.extend(self.my_id)
if recipient_id:
final.extend(recipient_id[:8])
final.extend(payload)
final.extend(signature)
final_padded = self._pad_data(final)
return bytes(final_padded)
async def connect_client(self, device: BLEDevice):
"""Connect to a BLE device"""
MAX_RETRIES = 3
RETRY_DELAY = 2.0
for attempt in range(MAX_RETRIES):
client = BleakClient(device)
try:
await client.connect()
if client.is_connected:
self.connected_clients[device.address] = client
print(f"[SYSTEM] Connected to peer at {device.address}")
# Send Handshake
handshake_payload = bytearray()
handshake_payload.extend(b'\x01')
handshake_payload.append(len(self.nickname))
handshake_payload.extend(self.nickname.encode('utf-8'))
handshake_payload.extend(b'\x02')
handshake_payload.append(len(self.x25519_public))
handshake_payload.extend(self.x25519_public)
handshake_payload.extend(b'\x03')
handshake_payload.append(len(self.public_key_bytes))
handshake_payload.extend(self.public_key_bytes)
packet = self._build_packet(PACKET_TYPE_ANNOUNCE, handshake_payload, recipient_id=b'\xff'*8)
await client.write_gatt_char(BITCHAT_RX_CHAR_UUID, packet, response=True)
await client.start_notify(BITCHAT_TX_CHAR_UUID, self._create_notification_handler(device.address))
return
except Exception:
try:
if client.is_connected: await client.disconnect()
except: pass
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(RETRY_DELAY * (2 ** attempt))
continue
else:
break
self.connecting_devices.discard(device.address)
def _create_notification_handler(self, address):
def handler(sender_handle: int, data: bytearray):
try:
if len(data) < HEADER_SIZE: return
packet_type = data[1]
flags = data[11]
payload_len = struct.unpack('>H', data[12:14])[0]
has_recipient = (flags & FLAG_HAS_RECIPIENT) != 0
is_compressed = (flags & FLAG_IS_COMPRESSED) != 0
offset = HEADER_SIZE
sender_id = data[offset : offset+8]
offset += 8
short_id = sender_id.hex()[-8:]
sender_hex = sender_id.hex()
sender_nick = self.peer_nicknames.get(sender_hex, short_id)
if has_recipient: offset += 8
raw_payload = data[offset : offset + payload_len]
final_text = ""
if is_compressed and lz4:
try:
compressed_data = bytes(raw_payload[2:])
uncompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=65536)
final_text = uncompressed_data.decode('utf-8', errors='ignore')
except Exception: return
else:
final_text = raw_payload.decode('utf-8', errors='ignore')
if packet_type == PACKET_TYPE_MESSAGE:
if final_text:
print(f"{sender_nick}: {final_text}")
sys.stdout.flush()
elif packet_type == PACKET_TYPE_ANNOUNCE:
try:
tlv_offset = 0
while tlv_offset < len(raw_payload):
if tlv_offset + 2 > len(raw_payload): break
tag = raw_payload[tlv_offset]
length = raw_payload[tlv_offset + 1]
tlv_offset += 2
if tlv_offset + length > len(raw_payload): break
value = raw_payload[tlv_offset:tlv_offset + length]
tlv_offset += length
if tag == 0x01: # Nickname
nickname = value.decode('utf-8', errors='ignore')
if self.peer_nicknames.get(sender_hex) != nickname:
self.peer_nicknames[sender_hex] = nickname
print(f"[SYSTEM] {nickname} ({short_id}) joined")
break
except: pass
except Exception: pass
return handler
async def send_message(self, message: str):
if not self.connected_clients: return
try:
packet = self._build_packet(PACKET_TYPE_MESSAGE, message.encode('utf-8'), b'\xff' * 8)
disconnected_addrs = []
sent_count = 0
for addr, client in list(self.connected_clients.items()):
try:
if not client.is_connected:
disconnected_addrs.append(addr)
continue
await client.write_gatt_char(BITCHAT_RX_CHAR_UUID, packet, response=True)
sent_count += 1
except Exception:
disconnected_addrs.append(addr)
for addr in disconnected_addrs:
if addr in self.connected_clients: del self.connected_clients[addr]
if sent_count > 0:
print(f"{self.nickname} (you): {message}")
except Exception: pass
async def run_scanner(self):
"""Continuously scan for Bitchat devices (Polling Mode)"""
print("[SYSTEM] Scanning for Bitchat devices...")
first_scan = True
while not self._stopping:
try:
# Scan for 3 seconds
devices = await BleakScanner.discover(timeout=3.0, return_adv=True)
if first_scan:
print(f"[DEBUG] Scan found {len(devices)} total devices")
first_scan = False
for device, adv in devices.values():
if device.address in self.connected_clients or device.address in self.connecting_devices:
continue
# Check UUID
if BITCHAT_SERVICE_UUID.lower() in adv.service_uuids:
if device.address not in self.seen_devices:
self.seen_devices.add(device.address)
print(f"[SYSTEM] Found peer: {device.address}")
self.connecting_devices.add(device.address)
asyncio.create_task(self.connect_client(device))
except Exception as e:
# print(f"[DEBUG] Scan error: {e}")
pass
await asyncio.sleep(1)
async def monitor_connections(self):
while not self._stopping:
await asyncio.sleep(5)
disconnected_addrs = []
for addr, client in list(self.connected_clients.items()):
if not client.is_connected:
disconnected_addrs.append(addr)
for addr in disconnected_addrs:
if addr in self.connected_clients:
del self.connected_clients[addr]
self.connecting_devices.discard(addr)
async def input_loop(self):
print("\nType your messages and press Enter to send. Ctrl+C to exit.\n")
while not self._stopping:
try:
line = await asyncio.get_event_loop().run_in_executor(None, sys.stdin.readline)
line = line.strip()
if line: await self.send_message(line)
except Exception: break
async def stop(self):
self._stopping = True
print("\n[SYSTEM] Disconnecting...")
if self.scanner_task:
self.scanner_task.cancel()
try: await self.scanner_task
except asyncio.CancelledError: pass
tasks = []
for client in list(self.connected_clients.values()):
try: tasks.append(asyncio.create_task(client.disconnect()))
except: pass
if tasks: await asyncio.gather(*tasks, return_exceptions=True)
self.connected_clients.clear()
async def main():
loop = asyncio.get_event_loop()
nickname = sys.argv[1] if len(sys.argv) > 1 else "Anonymous"
chat = BitchatBLEHandler(loop, nickname=nickname)
def signal_handler():
asyncio.create_task(chat.stop())
asyncio.get_event_loop().call_later(2, sys.exit, 0)
loop.add_signal_handler(signal.SIGINT, signal_handler)
try:
chat.scanner_task = asyncio.create_task(chat.run_scanner())
await asyncio.gather(chat.scanner_task, chat.input_loop(), chat.monitor_connections())
except asyncio.CancelledError: pass
finally:
await chat.stop()
print("[SYSTEM] Goodbye!")
if __name__ == "__main__":
asyncio.run(main())