Simplified DuckHunt bot with customizable messages and colors

This commit is contained in:
2025-09-23 02:57:28 +01:00
parent 9285b1b29d
commit de64756b6d
19 changed files with 797 additions and 5712 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

196
src/db.py
View File

@@ -1,181 +1,81 @@
"""
Database functionality for DuckHunt Bot
Simplified Database management for DuckHunt Bot
Only essential player fields
"""
import json
import os
import time
import asyncio
import logging
import time
import os
class DuckDB:
"""Database management for DuckHunt Bot"""
"""Simplified database management"""
def __init__(self, db_file="duckhunt.json"):
self.db_file = db_file
self.players = {}
self._save_pending = False
self.logger = logging.getLogger('DuckHuntBot.DB')
def get_config(self, path, default=None):
"""Helper method to get config values (needs to be set by bot)"""
if hasattr(self, '_config_getter'):
return self._config_getter(path, default)
return default
def set_config_getter(self, config_getter):
"""Set the config getter function from the main bot"""
self._config_getter = config_getter
self.load_database()
def load_database(self):
"""Load player data from JSON file"""
if os.path.exists(self.db_file):
try:
try:
if os.path.exists(self.db_file):
with open(self.db_file, 'r') as f:
data = json.load(f)
self.players = data.get('players', {})
self.logger.info(f"Loaded {len(self.players)} players from {self.db_file}")
except (json.JSONDecodeError, IOError) as e:
self.logger.error(f"Error loading database: {e}")
self.logger.info(f"Loaded {len(self.players)} players from {self.db_file}")
else:
self.players = {}
else:
self.logger.info(f"No existing database found, starting fresh")
except Exception as e:
self.logger.error(f"Error loading database: {e}")
self.players = {}
self.logger.info(f"Created new database: {self.db_file}")
def save_database(self):
"""Save all player data to JSON file with error handling"""
"""Save all player data to JSON file"""
try:
temp_file = f"{self.db_file}.tmp"
data = {
'players': self.players,
'last_save': str(time.time())
}
with open(temp_file, 'w') as f:
# Create backup
if os.path.exists(self.db_file):
backup_file = f"{self.db_file}.backup"
try:
with open(self.db_file, 'r') as src, open(backup_file, 'w') as dst:
dst.write(src.read())
except Exception as e:
self.logger.warning(f"Failed to create backup: {e}")
# Save main file
with open(self.db_file, 'w') as f:
json.dump(data, f, indent=2)
os.replace(temp_file, self.db_file)
except IOError as e:
self.logger.error(f"Error saving database: {e}")
except Exception as e:
self.logger.error(f"Unexpected database save error: {e}")
def _get_starting_accuracy(self):
"""Get starting accuracy with optional randomization"""
base_accuracy = self.get_config('new_players.starting_accuracy', 65) or 65
if self.get_config('new_players.random_stats.enabled', False):
import random
variance = self.get_config('new_players.random_stats.accuracy_variance', 10) or 10
return max(10, min(95, base_accuracy + random.randint(-variance, variance)))
return base_accuracy
def _get_starting_reliability(self):
"""Get starting reliability with optional randomization"""
base_reliability = self.get_config('new_players.starting_reliability', 70) or 70
if self.get_config('new_players.random_stats.enabled', False):
import random
variance = self.get_config('new_players.random_stats.reliability_variance', 10) or 10
return max(10, min(95, base_reliability + random.randint(-variance, variance)))
return base_reliability
def get_player(self, user):
self.logger.error(f"Error saving database: {e}")
def get_player(self, nick):
"""Get player data, creating if doesn't exist"""
if '!' not in user:
nick = user.lower()
else:
nick = user.split('!')[0].lower()
if nick in self.players:
player = self.players[nick]
self._ensure_player_fields(player)
return player
else:
return self.create_player(nick)
nick_lower = nick.lower()
if nick_lower not in self.players:
self.players[nick_lower] = self.create_player(nick)
return self.players[nick_lower]
def create_player(self, nick):
"""Create a new player with default stats"""
player = {
'shots': 6,
'max_shots': 6,
"""Create a new player with basic stats"""
return {
'nick': nick,
'xp': 0,
'ducks_shot': 0,
'ammo': 6,
'max_ammo': 6,
'chargers': 2,
'max_chargers': 2,
'reload_time': 5.0,
'ducks_shot': 0,
'ducks_befriended': 0,
'accuracy_bonus': 0,
'xp_bonus': 0,
'charm_bonus': 0,
'exp': 0,
'money': 100,
'last_hunt': 0,
'last_reload': 0,
'level': 1,
'inventory': {},
'ignored_users': [],
'jammed': False,
'jammed_count': 0,
'total_ammo_used': 0,
'shot_at': 0,
'wild_shots': 0,
'accuracy': self._get_starting_accuracy(),
'reliability': self._get_starting_reliability(),
'gun_confiscated': False,
'confiscated_count': 0
}
self.players[nick] = player
self.logger.info(f"Created new player: {nick}")
return player
def _ensure_player_fields(self, player):
"""Ensure player has all required fields for backward compatibility"""
required_fields = {
'shots': player.get('ammo', 6),
'max_shots': player.get('max_ammo', 6),
'chargers': player.get('chargers', 2),
'max_chargers': player.get('max_chargers', 2),
'reload_time': 5.0,
'ducks_shot': player.get('caught', 0),
'ducks_befriended': player.get('befriended', 0),
'accuracy_bonus': 0,
'xp_bonus': 0,
'charm_bonus': 0,
'exp': player.get('xp', 0),
'money': player.get('coins', 100),
'last_hunt': 0,
'last_reload': 0,
'level': 1,
'inventory': {},
'ignored_users': [],
'jammed': False,
'jammed_count': player.get('jammed_count', 0),
'total_ammo_used': player.get('total_ammo_used', 0),
'shot_at': player.get('shot_at', 0),
'wild_shots': player.get('wild_shots', 0),
'accuracy': player.get('accuracy', 65),
'reliability': player.get('reliability', 70),
'gun_confiscated': player.get('gun_confiscated', False),
'confiscated_count': player.get('confiscated_count', 0)
}
for field, default_value in required_fields.items():
if field not in player:
player[field] = default_value
def save_player(self, user):
"""Save player data - batch saves for performance"""
if not self._save_pending:
self._save_pending = True
asyncio.create_task(self._delayed_save())
async def _delayed_save(self):
"""Batch save to reduce disk I/O"""
await asyncio.sleep(0.5)
try:
self.save_database()
self.logger.debug("Database batch save completed")
except Exception as e:
self.logger.error(f"Database batch save failed: {e}")
finally:
self._save_pending = False
'accuracy': 65,
'gun_confiscated': False
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,310 +1,105 @@
"""
Game mechanics for DuckHunt Bot
Simplified Game mechanics for DuckHunt Bot
Basic duck spawning and timeout only
"""
import asyncio
import random
import time
import uuid
import logging
from typing import Dict, Any, Optional, List
class DuckGame:
"""Game mechanics and duck management"""
"""Simplified game mechanics - just duck spawning"""
def __init__(self, bot, db):
self.bot = bot
self.db = db
self.ducks = {}
self.ducks = {} # {channel: [duck1, duck2, ...]}
self.logger = logging.getLogger('DuckHuntBot.Game')
self.colors = {
'red': '\x0304',
'green': '\x0303',
'yellow': '\x0308',
'blue': '\x0302',
'cyan': '\x0311',
'magenta': '\x0306',
'white': '\x0300',
'bold': '\x02',
'reset': '\x03',
'underline': '\x1f'
}
def get_config(self, path, default=None):
"""Helper method to get config values"""
return self.bot.get_config(path, default)
def get_player_level(self, xp):
"""Calculate player level from XP"""
if xp < 0:
return 0
return int((xp ** 0.5) / 2) + 1
self.spawn_task = None
self.timeout_task = None
def get_xp_for_next_level(self, xp):
"""Calculate XP needed for next level"""
level = self.get_player_level(xp)
return ((level * 2) ** 2) - xp
def calculate_penalty_by_level(self, base_penalty, xp):
"""Reduce penalties for higher level players"""
level = self.get_player_level(xp)
return max(1, base_penalty - (level - 1))
def update_karma(self, player, event):
"""Update player karma based on events"""
if not self.get_config('karma.enabled', True):
return
karma_changes = {
'hit': self.get_config('karma.hit_bonus', 2),
'golden_hit': self.get_config('karma.golden_hit_bonus', 5),
'teamkill': -self.get_config('karma.teamkill_penalty', 10),
'wild_shot': -self.get_config('karma.wild_shot_penalty', 3),
'miss': -self.get_config('karma.miss_penalty', 1),
'befriend_success': self.get_config('karma.befriend_success_bonus', 2),
'befriend_fail': -self.get_config('karma.befriend_fail_penalty', 1)
}
async def start_game_loops(self):
"""Start the game loops"""
self.spawn_task = asyncio.create_task(self.duck_spawn_loop())
self.timeout_task = asyncio.create_task(self.duck_timeout_loop())
if event in karma_changes:
player['karma'] = player.get('karma', 0) + karma_changes[event]
try:
await asyncio.gather(self.spawn_task, self.timeout_task)
except asyncio.CancelledError:
self.logger.info("Game loops cancelled")
def is_sleep_time(self):
"""Check if ducks should not spawn due to sleep hours"""
sleep_hours = self.get_config('sleep_hours', [])
if not sleep_hours or len(sleep_hours) != 2:
return False
import datetime
current_hour = datetime.datetime.now().hour
start_hour, end_hour = sleep_hours
if start_hour <= end_hour:
return start_hour <= current_hour <= end_hour
else:
return current_hour >= start_hour or current_hour <= end_hour
def calculate_gun_reliability(self, player):
"""Calculate gun reliability with modifiers"""
base_reliability = player.get('reliability', 70)
return min(100, max(0, base_reliability))
def gun_jams(self, player):
"""Check if gun jams (eggdrop style)"""
reliability = player.get('reliability', 70)
jam_chance = max(1, 101 - reliability)
if player.get('total_ammo_used', 0) > 100:
jam_chance += 2
if player.get('jammed_count', 0) > 5:
jam_chance += 1
return random.randint(1, 100) <= jam_chance
async def duck_spawn_loop(self):
"""Simple duck spawning loop"""
try:
while True:
# Wait random time between spawns
min_wait = self.bot.get_config('duck_spawn_min', 300) # 5 minutes
max_wait = self.bot.get_config('duck_spawn_max', 900) # 15 minutes
wait_time = random.randint(min_wait, max_wait)
async def scare_other_ducks(self, channel, shot_duck_id):
"""Scare other ducks when one is shot"""
if channel not in self.ducks:
return
for duck in self.ducks[channel][:]:
if duck['id'] != shot_duck_id and duck['alive']:
if random.random() < 0.3:
duck['alive'] = False
self.ducks[channel].remove(duck)
await asyncio.sleep(wait_time)
# Spawn duck in random channel
channels = list(self.bot.channels_joined)
if channels:
channel = random.choice(channels)
await self.spawn_duck(channel)
except asyncio.CancelledError:
self.logger.info("Duck spawning loop cancelled")
async def duck_timeout_loop(self):
"""Simple duck timeout loop"""
try:
while True:
await asyncio.sleep(10) # Check every 10 seconds
current_time = time.time()
channels_to_clear = []
for channel, ducks in self.ducks.items():
ducks_to_remove = []
for duck in ducks:
if current_time - duck['spawn_time'] > self.bot.get_config('duck_timeout', 60):
ducks_to_remove.append(duck)
async def scare_duck_on_miss(self, channel, target_duck):
"""Scare duck when someone misses"""
if target_duck and random.random() < 0.15:
target_duck['alive'] = False
if channel in self.ducks and target_duck in self.ducks[channel]:
self.ducks[channel].remove(target_duck)
for duck in ducks_to_remove:
ducks.remove(duck)
message = self.bot.messages.get('duck_flies_away')
self.bot.send_message(channel, message)
if not ducks:
channels_to_clear.append(channel)
async def find_bushes_items(self, nick, channel, player):
"""Find random items in bushes"""
if not self.get_config('items.enabled', True):
return
if random.random() < 0.1:
items = [
("a mirror", "mirror", "You can now deflect shots!"),
("some sand", "sand", "Throw this to blind opponents!"),
("a rusty bullet", None, "It's too rusty to use..."),
("some bread crumbs", "bread", "Feed ducks to make them friendly!"),
]
found_item, item_key, message = random.choice(items)
if item_key and item_key in player:
player[item_key] = player.get(item_key, 0) + 1
elif item_key in player:
player[item_key] = player.get(item_key, 0) + 1
await self.bot.send_user_message(nick, channel,
f"You found {found_item} in the bushes! {message}")
# Clean up empty channels
for channel in channels_to_clear:
if channel in self.ducks and not self.ducks[channel]:
del self.ducks[channel]
except asyncio.CancelledError:
self.logger.info("Duck timeout loop cancelled")
def get_duck_spawn_message(self):
"""Get random duck spawn message (eggdrop style)"""
messages = [
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_O< QUACK",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_o< QUACK!",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_O< QUAAACK!",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_ö< Quack?",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_O< *QUACK*"
]
return random.choice(messages)
async def spawn_duck_now(self, channel, force_golden=False):
"""Spawn a duck immediately in the specified channel"""
async def spawn_duck(self, channel):
"""Spawn a duck in the channel"""
if channel not in self.ducks:
self.ducks[channel] = []
max_ducks = self.get_config('max_ducks_per_channel', 3)
if len([d for d in self.ducks[channel] if d['alive']]) >= max_ducks:
self.logger.debug(f"Max ducks already in {channel}")
# Don't spawn if there's already a duck
if self.ducks[channel]:
return
if force_golden:
duck_type = "golden"
else:
rand = random.random()
if rand < 0.02:
duck_type = "armored"
elif rand < 0.10:
duck_type = "golden"
elif rand < 0.30:
duck_type = "rare"
elif rand < 0.40:
duck_type = "fast"
else:
duck_type = "normal"
duck_config = self.get_config(f'duck_types.{duck_type}', {})
if not duck_config.get('enabled', True):
duck_type = "normal"
duck_config = self.get_config('duck_types.normal', {})
duck = {
'id': str(uuid.uuid4())[:8],
'type': duck_type,
'alive': True,
'id': f"duck_{int(time.time())}_{random.randint(1000, 9999)}",
'spawn_time': time.time(),
'health': duck_config.get('health', 1),
'max_health': duck_config.get('health', 1)
'channel': channel
}
self.ducks[channel].append(duck)
messages = duck_config.get('messages', [self.get_duck_spawn_message()])
spawn_message = random.choice(messages)
# Send spawn message
message = self.bot.messages.get('duck_spawn')
self.bot.send_message(channel, message)
self.bot.send_message(channel, spawn_message)
self.logger.info(f"Spawned {duck_type} duck in {channel}")
await self.send_duck_alerts(channel, duck_type)
return duck
async def send_duck_alerts(self, channel, duck_type):
"""Send alerts to users who have them enabled"""
if not self.get_config('social.duck_alerts_enabled', True):
return
self.logger.debug(f"Duck alerts for {duck_type} duck in {channel}")
async def spawn_ducks(self):
"""Main duck spawning loop"""
while not self.bot.shutdown_requested:
try:
if self.is_sleep_time():
await asyncio.sleep(300)
continue
for channel in self.bot.channels_joined:
if self.bot.shutdown_requested:
break
if channel not in self.ducks:
self.ducks[channel] = []
self.ducks[channel] = [d for d in self.ducks[channel] if d['alive']]
max_ducks = self.get_config('max_ducks_per_channel', 3)
alive_ducks = len([d for d in self.ducks[channel] if d['alive']])
if alive_ducks < max_ducks:
min_spawn_time = self.get_config('duck_spawn_min', 1800)
max_spawn_time = self.get_config('duck_spawn_max', 5400)
if random.random() < 0.1:
await self.spawn_duck_now(channel)
await asyncio.sleep(random.randint(60, 300))
except asyncio.CancelledError:
self.logger.info("Duck spawning loop cancelled")
break
except Exception as e:
self.logger.error(f"Error in duck spawning: {e}")
await asyncio.sleep(60)
async def duck_timeout_checker(self):
"""Check for ducks that should timeout"""
while not self.bot.shutdown_requested:
try:
current_time = time.time()
for channel in list(self.ducks.keys()):
if self.bot.shutdown_requested:
break
if channel not in self.ducks:
continue
for duck in self.ducks[channel][:]:
if not duck['alive']:
continue
age = current_time - duck['spawn_time']
min_timeout = self.get_config('duck_timeout_min', 45)
max_timeout = self.get_config('duck_timeout_max', 75)
timeout = random.randint(min_timeout, max_timeout)
if age > timeout:
duck['alive'] = False
self.ducks[channel].remove(duck)
timeout_messages = [
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_o> The duck flew away!",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_O> *FLAP FLAP FLAP*",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_o> The duck got tired of waiting and left!",
"-.,¸¸.-·°'`'°·-.,¸¸.-·°'`'°· \\_O> *KWAK* The duck escaped!"
]
self.bot.send_message(channel, random.choice(timeout_messages))
self.logger.debug(f"Duck timed out in {channel}")
await asyncio.sleep(10)
except asyncio.CancelledError:
self.logger.info("Duck timeout checker cancelled")
break
except Exception as e:
self.logger.error(f"Error in duck timeout checker: {e}")
await asyncio.sleep(30)
def get_alive_ducks(self, channel):
"""Get list of alive ducks in channel"""
if channel not in self.ducks:
return []
return [d for d in self.ducks[channel] if d['alive']]
def get_duck_by_id(self, channel, duck_id):
"""Get duck by ID"""
if channel not in self.ducks:
return None
for duck in self.ducks[channel]:
if duck['id'] == duck_id and duck['alive']:
return duck
return None
self.logger.info(f"Duck spawned in {channel}")

View File

@@ -6,9 +6,9 @@ import logging
import logging.handlers
class DetailedColorFormatter(logging.Formatter):
"""Console formatter with color support"""
COLORS = {
class DetailedColourFormatter(logging.Formatter):
"""Console formatter with colour support"""
COLOURS = {
'DEBUG': '\033[94m',
'INFO': '\033[92m',
'WARNING': '\033[93m',
@@ -18,14 +18,14 @@ class DetailedColorFormatter(logging.Formatter):
}
def format(self, record):
color = self.COLORS.get(record.levelname, '')
endc = self.COLORS['ENDC']
colour = self.COLOURS.get(record.levelname, '')
endc = self.COLOURS['ENDC']
msg = super().format(record)
return f"{color}{msg}{endc}"
return f"{colour}{msg}{endc}"
class DetailedFileFormatter(logging.Formatter):
"""File formatter with extra context but no colors"""
"""File formatter with extra context but no colours"""
def format(self, record):
return super().format(record)
@@ -39,7 +39,7 @@ def setup_logger(name="DuckHuntBot"):
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = DetailedColorFormatter(
console_formatter = DetailedColourFormatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
console_handler.setFormatter(console_formatter)

View File

@@ -3,7 +3,83 @@ Utility functions for DuckHunt Bot
"""
import re
from typing import Optional
import json
import os
from typing import Optional, Tuple, List, Dict, Any
class MessageManager:
"""Manages customizable IRC messages with color support"""
def __init__(self, messages_file: str = "messages.json"):
self.messages_file = messages_file
self.messages = {}
self.load_messages()
def load_messages(self):
"""Load messages from JSON file"""
try:
if os.path.exists(self.messages_file):
with open(self.messages_file, 'r', encoding='utf-8') as f:
self.messages = json.load(f)
else:
# Fallback messages if file doesn't exist
self.messages = self._get_default_messages()
except Exception as e:
print(f"Error loading messages: {e}, using defaults")
self.messages = self._get_default_messages()
def _get_default_messages(self) -> Dict[str, str]:
"""Default fallback messages without colors"""
return {
"duck_spawn": "・゜゜・。。・゜゜\\_o< QUACK! A duck has appeared! Type !bang to shoot it!",
"duck_flies_away": "The duck flies away. ·°'`'°-.,¸¸.·°'`",
"bang_hit": "{nick} > *BANG* You shot the duck! [+{xp_gained} xp] [Total ducks: {ducks_shot}]",
"bang_miss": "{nick} > *BANG* You missed the duck!",
"bang_no_duck": "{nick} > *BANG* What did you shoot at? There is no duck in the area... [GUN CONFISCATED]",
"bang_no_ammo": "{nick} > *click* You're out of ammo! Use !reload",
"bang_not_armed": "{nick} > You are not armed.",
"reload_success": "{nick} > *click* Reloaded! [Ammo: {ammo}/{max_ammo}] [Chargers: {chargers}]",
"reload_already_loaded": "{nick} > Your gun is already loaded!",
"reload_no_chargers": "{nick} > You're out of chargers!",
"reload_not_armed": "{nick} > You are not armed.",
"shop_display": "DuckHunt Shop: {items} | You have {xp} XP",
"shop_item_format": "({id}) {name} - {price} XP",
"help_header": "DuckHunt Commands:",
"help_user_commands": "!bang - Shoot at ducks | !reload - Reload your gun | !shop - View the shop",
"help_help_command": "!duckhelp - Show this help",
"help_admin_commands": "Admin: !rearm <player> | !disarm <player> | !ignore <player> | !unignore <player> | !ducklaunch",
"admin_rearm_player": "[ADMIN] {target} has been rearmed by {admin}",
"admin_rearm_all": "[ADMIN] All players have been rearmed by {admin}",
"admin_disarm": "[ADMIN] {target} has been disarmed by {admin}",
"admin_ignore": "[ADMIN] {target} is now ignored by {admin}",
"admin_unignore": "[ADMIN] {target} is no longer ignored by {admin}",
"admin_ducklaunch": "[ADMIN] A duck has been launched by {admin}",
"admin_ducklaunch_not_enabled": "[ADMIN] This channel is not enabled for duckhunt",
"usage_rearm": "Usage: !rearm <player>",
"usage_disarm": "Usage: !disarm <player>",
"usage_ignore": "Usage: !ignore <player>",
"usage_unignore": "Usage: !unignore <player>"
}
def get(self, key: str, **kwargs) -> str:
"""Get a formatted message by key"""
if key not in self.messages:
return f"[Missing message: {key}]"
message = self.messages[key]
# Format with provided variables
try:
return message.format(**kwargs)
except KeyError as e:
return f"[Message format error: {e}]"
except Exception as e:
return f"[Message error: {e}]"
def reload(self):
"""Reload messages from file"""
self.load_messages()
class InputValidator:
@@ -46,12 +122,17 @@ class InputValidator:
return sanitized[:500]
def parse_message(line):
def parse_irc_message(line: str) -> Tuple[str, str, List[str], str]:
"""Parse IRC message format"""
prefix = ''
trailing = ''
if line.startswith(':'):
prefix, line = line[1:].split(' ', 1)
if ' ' in line[1:]:
prefix, line = line[1:].split(' ', 1)
else:
# Handle malformed IRC line with no space after prefix
prefix = line[1:]
line = ''
if ' :' in line:
line, trailing = line.split(' :', 1)
parts = line.split()