Files
christmasevent/ChristmasEvent.py
2025-12-12 10:01:15 +01:00

460 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
import praw
import sqlite3
import time
import datetime
import re
import config
# ==============================================================================
# DATABASE MANAGER
# ==============================================================================
class DatabaseManager:
def __init__(self, db_name="christmas_event.db"):
self.conn = sqlite3.connect(db_name, check_same_thread=False)
self.cursor = self.conn.cursor()
self.create_tables()
def create_tables(self):
# Participants table
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS participants (
username TEXT PRIMARY KEY,
joined_at REAL
)
''')
# Wishes table
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS wishes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
recipient TEXT,
sender TEXT,
message TEXT,
received_at REAL,
FOREIGN KEY(recipient) REFERENCES participants(username),
UNIQUE(sender, recipient)
)
''')
# Conversations table
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS conversations (
sender TEXT PRIMARY KEY,
state TEXT,
pending_recipient TEXT,
pending_message TEXT,
timestamp REAL
)
''')
# Tracks if the final gift has been sent to a user
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS deliveries (
username TEXT PRIMARY KEY,
delivered_at TIMESTAMP
)
''')
self.conn.commit()
def add_participant(self, username):
try:
self.cursor.execute("INSERT INTO participants VALUES (?, ?)",
(username, datetime.datetime.now()))
self.conn.commit()
return True
except sqlite3.IntegrityError:
return False
def save_wish(self, sender, recipient, message):
"""Inserts or Replaces a wish"""
self.cursor.execute("SELECT username FROM participants WHERE username = ?", (recipient,))
if not self.cursor.fetchone():
self.add_participant(recipient)
self.cursor.execute('''
INSERT OR REPLACE INTO wishes (recipient, sender, message, received_at)
VALUES (?, ?, ?, ?)
''', (recipient, sender, message, datetime.datetime.now()))
self.conn.commit()
return True
def check_existing_wish(self, sender, recipient):
self.cursor.execute("SELECT message FROM wishes WHERE sender = ? AND recipient = ?", (sender, recipient))
result = self.cursor.fetchone()
return result[0] if result else None
def get_forest_data(self):
self.cursor.execute('''
SELECT p.username, COUNT(w.id) as wish_count
FROM participants p
LEFT JOIN wishes w ON p.username = w.recipient
GROUP BY p.username
ORDER BY p.username ASC
''')
return self.cursor.fetchall()
def set_conversation_state(self, sender, state, recipient, message):
self.cursor.execute('''
INSERT OR REPLACE INTO conversations (sender, state, pending_recipient, pending_message, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (sender, state, recipient, message, datetime.datetime.now()))
self.conn.commit()
def get_conversation(self, sender):
self.cursor.execute("SELECT state, pending_recipient, pending_message FROM conversations WHERE sender = ?",
(sender,))
return self.cursor.fetchone()
def clear_conversation(self, sender):
self.cursor.execute("DELETE FROM conversations WHERE sender = ?", (sender,))
self.conn.commit()
def get_undelivered_users(self):
"""Returns list of usernames who have wishes but haven't received them yet."""
self.cursor.execute('''
SELECT DISTINCT p.username
FROM participants p
JOIN wishes w ON p.username = w.recipient
LEFT JOIN deliveries d ON p.username = d.username
WHERE d.username IS NULL
''')
return [row[0] for row in self.cursor.fetchall()]
def get_user_wishes(self, username):
self.cursor.execute("SELECT sender, message FROM wishes WHERE recipient = ?", (username,))
return self.cursor.fetchall()
def mark_delivered(self, username):
self.cursor.execute("INSERT INTO deliveries VALUES (?, ?)", (username, datetime.datetime.now()))
self.conn.commit()
# ==============================================================================
# SAFETY FILTER
# ==============================================================================
class SafetyFilter:
def __init__(self, blocklist):
self.blocklist = blocklist
def is_safe(self, text):
"""
Returns True if text is safe, False if it contains blocked terms.
Performs a simple substring check (case-insensitive).
"""
if not text:
return True
text_lower = text.lower()
for bad_word in self.blocklist:
# Check if the bad word exists in the text
# Note: logic can be improved with regex \b word boundaries if needed
if bad_word in text_lower:
return False
return True
# ==============================================================================
# BOT LOGIC
# ==============================================================================
class ChristmasBot:
# Store the last updated data to prevent unneeded updates.
last_updated_data = None
def __init__(self):
print("🎄 Initializing Christmas Helper...")
self.reddit = praw.Reddit(
client_id=config.eventBotClientId,
client_secret=config.eventBotSecret,
user_agent=config.user_agent,
username=config.username,
password=config.password
)
self.reddit.validate_on_submit = True
self.db = DatabaseManager()
self.safety = SafetyFilter(config.blocklist)
self.subreddit = self.reddit.subreddit(config.subreddit_name)
self.last_dashboard_update = 0
self.username_pattern = re.compile(r"^u/[A-Za-z0-9_-]{3,20}$", re.IGNORECASE)
self.yes_pattern = re.compile(r"^(yes|y|yeah|yup|ja|si|oui|confirm)$", re.IGNORECASE)
self.no_pattern = re.compile(r"^(no|nop|cancel|nah|not quite|negative|nuh uh)$", re.IGNORECASE)
def get_visual_icon(self, count):
if count == 0:
return "🌱"
elif count == 1:
return "🎄 (1 Gift)"
elif count <= 5:
return "🎄" * count + f" ({count})"
else:
return f"🌟🌟🌟 ({count} Gifts!)"
def verify_user_flair(self, username):
try:
flair_list = list(self.subreddit.flair(redditor=username))
if not flair_list: return False
user_flair = flair_list[0]
if user_flair['flair_text'] or user_flair['flair_css_class']: return True
return False
except Exception as e:
print(f"⚠️ Error checking flair for {username}: {e}")
return False
def update_dashboard(self):
now = time.time()
if now - self.last_dashboard_update < config.update_post_interval: return
data = self.db.get_forest_data()
if self.last_updated_data and len(self.last_updated_data) == len(data):
# Nothing changed.
print(f"No change in dashboard, last checked {now}...", end="\r")
return
print("📝 Updating Forest Dashboard...")
body = "# 🎄 The Christmas Forest 🎄\n\n"
body += "### How to Participate:\n"
body += "1. Send a DM to u/{}\n".format(config.username)
body += "2. **First Line:** The username (MUST start with `u/` e.g. `u/SexyElf69`)\n"
body += "3. **Next Lines:** Your message.\n\n"
body += "**Note:** The bot will ask you to confirm before saving.\n\n"
body += "See the announcement post [here](https://www.reddit.com/r/NoNutYearlyCommunity/comments/1pkmvwh/introducing_the_nnyc_christmas_wishbot/)\n"
body += "---\n\n"
body += "| User | Tree Status |\n| :--- | :--- |\n"
if not data: body += "| The Forest is quiet... | Be the first to send a wish! |\n"
for username, count in data:
icon = self.get_visual_icon(count)
body += f"| u/{username} | {icon} |\n"
body += "\n\n---\n*Updated: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "*"
try:
submission = self.reddit.submission(id=config.master_post_id)
submission.edit(body, )
self.last_dashboard_update = now
print("✅ Dashboard updated.")
except Exception as e:
print(f"❌ Error updating dashboard: {e}")
def parse_new_wish_request(self, message):
body_lines = message.body.strip().split('\n')
body_lines = [line.strip() for line in body_lines if line.strip()]
if not body_lines:
return None, None
first_line = body_lines[0]
if self.username_pattern.match(first_line):
target_user = first_line[2:]
wish_content = '\n'.join(body_lines[1:])
return target_user, wish_content
return None, None
def process_inbox(self):
try:
for message in self.reddit.inbox.unread():
if message.was_comment: continue
sender = message.author.name if message.author else "Unknown"
print(f"📩 Message from {sender}")
conversation = self.db.get_conversation(sender)
if conversation:
self.handle_confirmation(message, sender, conversation)
else:
self.handle_new_request(message, sender)
message.mark_read()
except Exception as e:
print(f"⚠️ Error checking inbox: {e}")
def handle_new_request(self, message, sender):
target_user, wish_content = self.parse_new_wish_request(message)
if not self.verify_user_flair(sender):
message.reply(f"Sorry, u/{sender} but you do not have an active Flair in r/{config.subreddit_name}.\n"
"We only allow wishes for active, flaired community members.")
return
# ERROR: Bad Format
if not target_user:
reply = ("I couldn't understand your request. Please format your message exactly like this:\n\n"
"`u/TargetUsername`\n\n"
"`Your message here...`\n\n"
"Make sure the username is on the **first line** and starts with **u/**.")
message.reply(reply)
return
# ERROR: Empty Body
if not wish_content:
message.reply(f"You didn't write a message for u/{target_user}! Please try again with text in the body.")
return
# SAFETY CHECK
if not self.safety.is_safe(wish_content):
print(f"⛔ Blocked unsafe content from {sender}")
message.reply("⛔ **Message Rejected**\n\n"
"Your message contains words or phrases that are not allowed in this event. "
"Please ensure your wish is kind and follows the community guidelines.")
return
# ERROR: No Flair
if not self.verify_user_flair(target_user):
message.reply(f"Sorry, u/{target_user} does not have a User Flair in r/{config.subreddit_name}.\n"
"We only allow wishes for active, flaired community members.")
return
# CHECK: Existing Wish
existing_msg = self.db.check_existing_wish(sender, target_user)
if existing_msg:
self.db.set_conversation_state(sender, "CONFIRM_REPLACE", target_user, wish_content)
reply = (f"⚠️ **You already sent a wish to u/{target_user}.**\n\n"
f"**Old Message:** {existing_msg[:100]}...\n\n"
f"**Do you want to REPLACE it with:**\n"
f"> {wish_content}\n\n"
f"Reply **YES** to confirm or **NO** to cancel.")
message.reply(reply)
else:
self.db.set_conversation_state(sender, "CONFIRM_NEW", target_user, wish_content)
reply = (f"🎄 **Christmas Confirmation** 🎄\n\n"
f"I extracted that you want to send a wish to **u/{target_user}**.\n\n"
f"**Message:**\n"
f"> {wish_content}\n\n"
f"Is this correct? Reply **YES** to save or **NO** to cancel.")
message.reply(reply)
def handle_confirmation(self, message, sender, conversation):
state, target, content = conversation
user_response = message.body.strip().lower()
# SAFETY CHECK (Re-check in case DB state was manipulated or rules changed)
if not self.safety.is_safe(content):
self.db.clear_conversation(sender)
message.reply("⛔ **Message Rejected** during confirmation. Content violates safety rules.")
return
if self.yes_pattern.match(user_response):
self.db.save_wish(sender, target, content)
self.db.clear_conversation(sender)
print(f"💾 Wish saved for {target}")
if state == "CONFIRM_REPLACE":
message.reply(f"✅ Your wish for u/{target} has been **updated**! 🎄")
print(f"{sender} has updated their wish for u/{target} to **{content}**.")
else:
message.reply(f"✅ Your wish for u/{target} has been **saved**! 🎄")
print(f"{sender} has saved their wish for u/{target}: **{content}**.")
elif self.no_pattern.match(user_response):
self.db.clear_conversation(sender)
message.reply("❌ Cancelled. You can send a new message to start over.")
print(f"🚫 Action cancelled by {sender}")
else:
message.reply("I didn't catch that. Please reply **YES** to confirm or **NO** to cancel.")
def run(self):
print("🚀 Christmas Bot Started. Press Ctrl+C to stop.")
while True:# 1. Process new wishes
self.process_inbox()
# 2. Update the public post
self.update_dashboard()
# 3. Check if it's Christmas yet (NEW)
self.check_distribution()
time.sleep(config.check_interval)
def check_distribution(self):
"""Checks if it is Dec 24th UTC (Midnight) or later and triggers distribution."""
now_utc = datetime.datetime.now(datetime.timezone.utc)
# Check: Is it December AND is it the 24th or later?
if now_utc.month == 12 and now_utc.day >= 24:
self.distribute_gifts()
def distribute_gifts(self):
recipients = self.db.get_undelivered_users()
# Silent exit if no pending deliveries (prevents log spam)
if not recipients:
return
print(f"🎅 HO HO HO! It's Christmas! Distributing gifts to {len(recipients)} users...")
for username in recipients:
wishes = self.db.get_user_wishes(username)
if not wishes: continue
print(f"🎁 Preparing gifts for u/{username}...")
# --- PAGINATION LOGIC ---
messages_to_send = []
current_part_content = ""
current_part_num = 1
# Header for first message
header = f"Merry Christmas u/{username}!\n\nThe wait is over. Here are your wishes:\n\n---\n\n"
current_part_content += header
for sender, message_text in wishes:
# Format the single wish
wish_block = f"**From: u/{sender}**\n> {message_text}\n\n---\n\n"
# Check character limit (safe buffer of 9000 chars)
if len(current_part_content) + len(wish_block) > 9000:
# Current part full. Close it and start new one.
current_part_content += f"\n*(Continued in Part {current_part_num + 1}...)*"
messages_to_send.append(current_part_content)
current_part_num += 1
current_part_content = f"**Christmas Wishes Part {current_part_num}**\n\n---\n\n"
current_part_content += wish_block
else:
current_part_content += wish_block
# Append the footer to the last part
current_part_content += f"\nYou received {len(wishes)} ornament(s). Have a wonderful holiday!\n"
current_part_content += f"\n*Automated message from r/{config.subreddit_name}*"
messages_to_send.append(current_part_content)
# --- SENDING LOOP ---
try:
for index, body in enumerate(messages_to_send):
subject_line = "🎄 Open your Christmas Time Capsule! 🎁"
if len(messages_to_send) > 1:
subject_line += f" (Part {index + 1}/{len(messages_to_send)})"
self.reddit.redditor(username).message(subject=subject_line, message=body)
# Small sleep between parts to ensure order and avoid spam trigger
time.sleep(2)
self.db.mark_delivered(username)
print(f"✅ Delivered {len(messages_to_send)} part(s) to u/{username}")
time.sleep(5) # Delay between USERS
except Exception as e:
print(f"❌ Failed to send to u/{username}: {e}")
if __name__ == "__main__":
bot = ChristmasBot()
bot.run()