GigaProjects

← Back to nostrX

nostrx.py

#!/usr/bin/env python3
"""
NostrX - Nostr to Twitter/X Sync Tool
Syncs posts from Nostr to Twitter, handling media and remembering state.
"""

import asyncio
import json
import os
import re
import time
import requests
import tempfile
from datetime import datetime, timedelta
from dotenv import load_dotenv

import tweepy
from nostr_sdk import (
    Client, Filter, Kind, Timestamp, PublicKey, 
    RelayUrl
)

# Load environment variables from .env file
load_dotenv()

# ==========================================
# CONFIGURATION
# ==========================================

# Nostr Settings
# Get npubs from environment variable (comma-separated)
npubs_env = os.getenv("NOSTR_NPUBS", "")
MONITORED_NPUBS = [n.strip() for n in npubs_env.split(",") if n.strip()]

# Get relays from environment variable (comma-separated), or use defaults
relays_env = os.getenv("NOSTR_RELAYS", "")
if relays_env:
    NOSTR_RELAYS = [r.strip() for r in relays_env.split(",") if r.strip()]
else:
    NOSTR_RELAYS = [
        "wss://relay.damus.io",
        "wss://nos.lol",
        "wss://relay.nostr.band",
        "wss://relay.primal.net"
    ]

# Twitter Settings
TWITTER_API_KEY = os.getenv("TWITTER_API_KEY")
TWITTER_API_SECRET = os.getenv("TWITTER_API_SECRET")
TWITTER_ACCESS_TOKEN = os.getenv("TWITTER_ACCESS_TOKEN")
TWITTER_ACCESS_SECRET = os.getenv("TWITTER_ACCESS_SECRET")

# State File
STATE_FILE = "sync_state.json"

# Media Extensions to detect in Nostr posts
MEDIA_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.gif', '.mp4', '.mov']

# ==========================================
# STATE MANAGEMENT
# ==========================================

def load_state():
    if os.path.exists(STATE_FILE):
        try:
            with open(STATE_FILE, 'r') as f:
                return json.load(f)
        except:
            pass
    # Default state: Sync from 24 hours ago if running for the first time
    return {
        "last_synced_timestamp": int(time.time()) - 86400,
        "synced_event_ids": [] # Keep track of IDs to avoid duplicates
    }

def save_state(state):
    # Keep history manageable (last 1000 IDs)
    state["synced_event_ids"] = state["synced_event_ids"][-1000:]
    
    with open(STATE_FILE, 'w') as f:
        json.dump(state, f, indent=2)

# ==========================================
# MEDIA HANDLING
# ==========================================

def extract_media_urls(content):
    """Find media URLs in text content"""
    urls = []
    clean_content = content
    
    # Regex for URLs
    url_pattern = r'https?://\S+'
    found_urls = re.findall(url_pattern, content)
    
    for url in found_urls:
        lower_url = url.lower()
        # Check if it looks like an image/video file
        if any(lower_url.endswith(ext) for ext in MEDIA_EXTENSIONS):
            urls.append(url)
            # Remove the URL from the text so it doesn't appear as a link in the tweet
            # (Twitter displays uploaded media natively)
            clean_content = clean_content.replace(url, "").strip()
            
    return clean_content, urls

def download_media(url):
    """Download media to a temp file"""
    try:
        # Fake user agent to avoid blocking
        headers = {'User-Agent': 'Mozilla/5.0'}
        response = requests.get(url, stream=True, headers=headers, timeout=10)
        
        if response.status_code == 200:
            # Get extension
            ext = os.path.splitext(url)[1]
            if not ext:
                ext = ".jpg" # Default
                
            # Create temp file
            tf = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
            for chunk in response.iter_content(chunk_size=8192):
                tf.write(chunk)
            tf.close()
            return tf.name
    except Exception as e:
        print(f"     ❌ Failed to download media {url}: {e}")
    return None

# ==========================================
# SYNC LOGIC
# ==========================================

class SyncTool:
    def __init__(self):
        self.state = load_state()
        self.client = None
        self.twitter_client = None
        self.twitter_v2 = None
        
    def setup_twitter(self):
        if all([TWITTER_API_KEY, TWITTER_API_SECRET, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_SECRET]):
            # V1.1 API for media upload
            auth = tweepy.OAuth1UserHandler(
                TWITTER_API_KEY, TWITTER_API_SECRET,
                TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_SECRET
            )
            self.twitter_client = tweepy.API(auth)
            
            # V2 API for posting tweets
            self.twitter_v2 = tweepy.Client(
                consumer_key=TWITTER_API_KEY,
                consumer_secret=TWITTER_API_SECRET,
                access_token=TWITTER_ACCESS_TOKEN,
                access_token_secret=TWITTER_ACCESS_SECRET
            )
            print("✓ Twitter API connected")
        else:
            print("⚠️  Twitter credentials missing. Running in DRY RUN mode.")

    async def setup_nostr(self):
        self.client = Client()
        for relay in NOSTR_RELAYS:
            await self.client.add_relay(RelayUrl.parse(relay))
        
        await self.client.connect()
        print(f"✓ Connected to {len(NOSTR_RELAYS)} Nostr relays")

    async def run(self):
        print(f"[{datetime.now()}] Starting NostrX Sync...")
        print(r"""
                  _      __  __
  _ __   ___  ___| |_ _ _\ \/ /
 | '_ \ / _ \/ __| __| '__\  / 
 | | | | (_) \__ \ |_| |  /  \ 
 |_| |_|\___/|___/\__|_| /_/\_\                       
        """)
        
        self.setup_twitter()
        
        if not MONITORED_NPUBS:
            print("❌ No npubs configured in MONITORED_NPUBS")
            return

        await self.setup_nostr()

        # 1. Prepare Filter
        # Fetch events since the last successful sync
        last_ts = self.state["last_synced_timestamp"]
        since = Timestamp.from_secs(last_ts + 1)
        
        authors = [PublicKey.parse(npub) for npub in MONITORED_NPUBS]
        f = Filter().authors(authors).kind(Kind(1)).since(since)
        
        print(f"\n📥 Fetching posts since {datetime.fromtimestamp(last_ts)}...")
        
        # Fetch events
        timeout = timedelta(seconds=30)
        events = await self.client.fetch_events(f, timeout)
        event_list = events.to_vec()
        
        # Sort oldest to newest so we post in order
        event_list.sort(key=lambda x: x.created_at().as_secs())
        
        if not event_list:
            print("✅ No new posts found.")
            return

        print(f"found {len(event_list)} new posts.")
        
        new_last_ts = last_ts
        
        for event in event_list:
            event_id = event.id().to_hex()
            
            # Skip duplicates
            if event_id in self.state["synced_event_ids"]:
                continue
                
            # Skip replies
            is_reply = False
            for tag in event.tags().to_vec():
                t = tag.as_vec()
                if len(t) > 0 and t[0] in ['e', 'reply']:
                    is_reply = True
                    break
            
            if is_reply:
                print(f"⏭️  Skipping reply {event_id[:8]}")
                continue

            # Process Content
            content = event.content()
            clean_text, media_urls = extract_media_urls(content)
            ts = datetime.fromtimestamp(event.created_at().as_secs())
            
            print(f"\n📝 Processing post from {ts}:")
            print(f"   \"{clean_text[:50]}...\"")
            
            # Download Media
            media_ids = []
            temp_files = []
            
            if media_urls:
                print(f"   📷 Found {len(media_urls)} media items")
                for url in media_urls:
                    path = download_media(url)
                    if path:
                        temp_files.append(path)
                        if self.twitter_client:
                            try:
                                print(f"     Uploading {os.path.basename(path)}...")
                                media = self.twitter_client.media_upload(filename=path)
                                media_ids.append(media.media_id)
                            except Exception as e:
                                print(f"     ❌ Upload failed: {e}")
            
            # Post to Twitter
            if self.twitter_v2:
                try:
                    # Truncate text if too long
                    if len(clean_text) > 280:
                        clean_text = clean_text[:277] + "..."
                    
                    if media_ids:
                        self.twitter_v2.create_tweet(text=clean_text, media_ids=media_ids)
                    else:
                        self.twitter_v2.create_tweet(text=clean_text)
                        
                    print("   ✅ Posted to Twitter")
                    
                    # Update state immediately after success
                    self.state["synced_event_ids"].append(event_id)
                    if event.created_at().as_secs() > new_last_ts:
                        new_last_ts = event.created_at().as_secs()
                        self.state["last_synced_timestamp"] = new_last_ts
                    
                    save_state(self.state)
                    
                except Exception as e:
                    print(f"   ❌ Failed to tweet: {e}")
            else:
                print("   [DRY RUN] Would post to Twitter")
                # In dry run, we still update state to avoid "processing" them again in this loop
                # but usually you wouldn't save state in dry run. 
                # For this tool, let's NOT save state in dry run so you can test freely.
            
            # Cleanup
            for path in temp_files:
                try:
                    os.remove(path)
                except:
                    pass
            
            # Small delay to be nice to APIs
            time.sleep(1)

        print("\n Sync Complete!")

if __name__ == "__main__":
    tool = SyncTool()
    asyncio.run(tool.run())

Run this code