Prepare for GitHub release

This commit is contained in:
3nd3r
2025-12-28 13:16:55 -06:00
parent f8c46980de
commit 4d17ae8f04
8 changed files with 672 additions and 394 deletions

View File

@@ -38,6 +38,9 @@ class DuckHuntBot:
self.messages = MessageManager()
self.sasl_handler = SASLHandler(self, config)
# Config file path for persisting runtime config changes (e.g., admin join/leave).
self.config_file_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.json')
# Set up health checks
self._setup_health_checks()
@@ -58,7 +61,7 @@ class DuckHuntBot:
# Database health check
self.health_checker.add_check(
'database',
lambda: self.db is not None and len(self.db.players) >= 0,
lambda: self.db is not None and sum(1 for _ in self.db.iter_all_players()) >= 0,
critical=True
)
@@ -130,10 +133,8 @@ class DuckHuntBot:
return False
target = args[0].lower()
player = self.db.get_player(target)
if player is None:
player = self.db.create_player(target)
self.db.players[target] = player
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
player = self.db.get_player(target, channel_ctx)
action_func(player)
message = self.messages.get(success_message_key, target=target, admin=nick)
@@ -147,29 +148,21 @@ class DuckHuntBot:
Returns (player, error_message) - if error_message is not None, command should return early.
"""
is_private_msg = not channel.startswith('#')
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
if not is_private_msg:
if target_nick.lower() == nick.lower():
target_nick = target_nick.lower()
player = self.db.get_player(target_nick)
if player is None:
player = self.db.create_player(target_nick)
self.db.players[target_nick] = player
player = self.db.get_player(target_nick, channel_ctx)
return player, None
else:
is_valid, player, error_msg = self.validate_target_player(target_nick, channel)
if not is_valid:
return None, error_msg
target_nick = target_nick.lower()
if target_nick not in self.db.players:
self.db.players[target_nick] = player
return player, None
else:
target_nick = target_nick.lower()
player = self.db.get_player(target_nick)
if player is None:
player = self.db.create_player(target_nick)
self.db.players[target_nick] = player
player = self.db.get_player(target_nick, channel_ctx)
return player, None
def _get_validated_target_player(self, nick, channel, target_nick):
@@ -566,8 +559,9 @@ class DuckHuntBot:
allowed_chars='abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-[]{}^`|\\')
# Get player data with error recovery
channel_ctx = safe_channel if isinstance(safe_channel, str) and safe_channel.startswith('#') else None
player = self.error_recovery.safe_execute(
lambda: self.db.get_player(nick),
lambda: self.db.get_player(nick, channel_ctx),
fallback={'nick': nick, 'xp': 0, 'ducks_shot': 0, 'gun_confiscated': False},
logger=self.logger
)
@@ -580,7 +574,6 @@ class DuckHuntBot:
try:
player['last_activity_channel'] = safe_channel
player['last_activity_time'] = time.time()
self.db.players[nick.lower()] = player
except Exception as e:
self.logger.warning(f"Error updating player activity for {nick}: {e}")
@@ -672,6 +665,13 @@ class DuckHuntBot:
fallback=None,
logger=self.logger
)
elif cmd == "globalducks":
command_executed = True
await self.error_recovery.safe_execute_async(
lambda: self.handle_globalducks(nick, channel, safe_args, user),
fallback=None,
logger=self.logger
)
elif cmd == "rearm" and self.is_admin(user):
command_executed = True
await self.error_recovery.safe_execute_async(
@@ -707,6 +707,20 @@ class DuckHuntBot:
fallback=None,
logger=self.logger
)
elif cmd == "join" and self.is_admin(user):
command_executed = True
await self.error_recovery.safe_execute_async(
lambda: self.handle_join_channel(nick, channel, safe_args),
fallback=None,
logger=self.logger
)
elif (cmd == "leave" or cmd == "part") and self.is_admin(user):
command_executed = True
await self.error_recovery.safe_execute_async(
lambda: self.handle_leave_channel(nick, channel, safe_args),
fallback=None,
logger=self.logger
)
# If no command was executed, it might be an unknown command
if not command_executed:
@@ -743,8 +757,9 @@ class DuckHuntBot:
if not target_nick:
return False, None, "Invalid target nickname"
player = self.db.get_player(target_nick)
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
player = self.db.get_player(target_nick, channel_ctx)
if not player:
return False, None, f"Player '{target_nick}' not found. They need to participate in the game first."
@@ -768,7 +783,8 @@ class DuckHuntBot:
We assume if someone has been active recently, they're still in the channel.
"""
try:
player = self.db.get_player(nick)
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
player = self.db.get_player(nick, channel_ctx)
if not player:
return False
@@ -887,7 +903,8 @@ class DuckHuntBot:
"""Handle !duckstats command"""
if args and len(args) > 0:
target_nick = args[0]
target_player = self.db.get_player(target_nick)
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
target_player = self.db.get_player(target_nick, channel_ctx)
if not target_player:
message = f"{nick} > Player '{target_nick}' not found."
self.send_message(channel, message)
@@ -980,11 +997,13 @@ class DuckHuntBot:
bold = self.messages.messages.get('colours', {}).get('bold', '')
reset = self.messages.messages.get('colours', {}).get('reset', '')
# Get top 3 by XP
top_xp = self.db.get_leaderboard('xp', 3)
# Get top 3 by ducks shot
top_ducks = self.db.get_leaderboard('ducks_shot', 3)
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
# Get top 3 by XP (channel-scoped)
top_xp = self.db.get_leaderboard_for_channel(channel_ctx, 'xp', 3)
# Get top 3 by ducks shot (channel-scoped)
top_ducks = self.db.get_leaderboard_for_channel(channel_ctx, 'ducks_shot', 3)
# Format XP leaderboard as single line
if top_xp:
@@ -1013,19 +1032,216 @@ class DuckHuntBot:
self.send_message(channel, f"{nick} > Error retrieving leaderboard data.")
async def handle_duckhelp(self, nick, channel, _player):
"""Handle !duckhelp command"""
"""Handle !duckhelp command
Sends help to the user via private message (PM/DM) with examples.
"""
dm_target = nick # PRIVMSG target for a PM is the nick
help_lines = [
self.messages.get('help_header'),
self.messages.get('help_user_commands'),
self.messages.get('help_help_command')
"DuckHunt Commands (sent via PM)",
"Player commands:",
"- !bang — shoot when a duck appears. Example: !bang",
"- !reload — reload your weapon. Example: !reload",
"- !shop — list shop items. Example: !shop",
"- !buy <item_id> — buy from shop. Example: !buy 3",
"- !use <item_id> [target] — use an inventory item. Example: !use 7 OR !use 9 SomeNick",
"- !duckstats [player] — view stats/inventory. Example: !duckstats OR !duckstats SomeNick",
"- !topduck — show leaderboards. Example: !topduck",
"- !give <item_id> <player> — gift an owned item. Example: !give 2 SomeNick",
"- !globalducks [player] — duck totals across all configured channels. Example: !globalducks OR !globalducks SomeNick",
"- !duckhelp — show this help. Example: !duckhelp",
]
# Add admin commands if user is admin
if self.is_admin(f"{nick}!user@host"):
help_lines.append(self.messages.get('help_admin_commands'))
# Include admin commands only for admins.
# (Using nick list avoids relying on hostmask parsing.)
if nick.lower() in self.admins:
help_lines.extend([
"Admin commands:",
"- !rearm <player|all> — rearm a player. Example: !rearm SomeNick OR !rearm all",
"- !disarm <player> — confiscate gun. Example: !disarm SomeNick",
"- !ignore <player> — ignore a player. Example: !ignore SomeNick",
"- !unignore <player> — unignore a player. Example: !unignore SomeNick",
"- !ducklaunch [duck_type] — force spawn. Example: !ducklaunch golden",
"- (PM) !ducklaunch <#channel> [duck_type] — Example: !ducklaunch #ct fast",
"- !join <#channel> — make the bot join a channel. Example: !join #ct",
"- !leave <#channel> — make the bot leave a channel. Example: !leave #ct",
])
for line in help_lines:
self.send_message(channel, line)
self.send_message(dm_target, line)
# If invoked in a channel, add a brief confirmation to check PMs.
if isinstance(channel, str) and channel.startswith('#'):
self.send_message(channel, f"{nick} > I sent you a PM with commands and examples. Please check your PM window.")
async def handle_globalducks(self, nick, channel, args, user):
"""User: !globalducks [player] — totals across all configured channels.
Non-admins can query themselves only. Admins can query other nicks.
"""
try:
channels = self.get_config('connection.channels', []) or []
if not isinstance(channels, list):
channels = []
target_nick = nick
if args and len(args) >= 1:
requested = sanitize_user_input(
str(args[0]),
max_length=50,
allowed_chars='abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-[]{}^`|\\'
)
if requested:
target_nick = requested
# Anyone can query anyone via !globalducks <player>
totals = self.db.get_global_duck_totals(target_nick, channels)
shot = totals.get('ducks_shot', 0)
bef = totals.get('ducks_befriended', 0)
total = totals.get('total_ducks', shot + bef)
counted = totals.get('channels_counted', 0)
if not channels:
self.send_message(channel, f"{nick} > No configured channels to total.")
return
self.send_message(
channel,
f"{nick} > Global totals for {target_nick} across configured channels: {shot} shot, {bef} befriended ({total} total) [{counted}/{len(channels)} channels have stats]."
)
except Exception as e:
self.logger.error(f"Error in handle_globalducks: {e}")
self.send_message(channel, f"{nick} > Error calculating global totals.")
def _sanitize_channel_name(self, channel_name: str) -> str:
"""Validate/sanitize an IRC channel name."""
if not isinstance(channel_name, str):
return ""
safe = sanitize_user_input(
channel_name.strip(),
max_length=100,
allowed_chars='#&+!abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-[]{}^`|\\'
)
if not safe:
return ""
if not (safe.startswith('#') or safe.startswith('&')):
return ""
return safe
def _config_channels_list(self):
"""Return the mutable in-memory config channel list, creating it if needed."""
if not isinstance(self.config, dict):
self.config = {}
connection = self.config.get('connection')
if not isinstance(connection, dict):
connection = {}
self.config['connection'] = connection
channels = connection.get('channels')
if not isinstance(channels, list):
channels = []
connection['channels'] = channels
return channels
def _persist_config(self) -> bool:
"""Persist current config to disk (best-effort, atomic write)."""
try:
config_dir = os.path.dirname(self.config_file_path)
os.makedirs(config_dir, exist_ok=True)
tmp_path = f"{self.config_file_path}.tmp"
with open(tmp_path, 'w', encoding='utf-8') as f:
# Keep it stable/human-readable.
import json
json.dump(self.config, f, indent=4, ensure_ascii=False)
f.write("\n")
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, self.config_file_path)
return True
except Exception as e:
self.logger.error(f"Failed to persist config to disk: {e}")
return False
async def handle_join_channel(self, nick, channel, args):
"""Admin: !join <#channel> (supports PM and channel invocation)."""
if not args:
self.send_message(channel, f"{nick} > Usage: !join <#channel>")
return
target_channel = self._sanitize_channel_name(args[0])
if not target_channel:
self.send_message(channel, f"{nick} > Invalid channel. Usage: !join <#channel>")
return
if target_channel in self.channels_joined:
self.send_message(channel, f"{nick} > I'm already in {target_channel}.")
return
if not self.send_raw(f"JOIN {target_channel}"):
self.send_message(channel, f"{nick} > Couldn't send JOIN (not connected?).")
return
# Track it immediately (we also reconcile on actual JOIN server message).
self.channels_joined.add(target_channel)
# Update in-memory config so reconnects keep the new channel.
channels = self._config_channels_list()
if target_channel not in channels:
channels.append(target_channel)
# Persist across restarts
if not self._persist_config():
self.send_message(channel, f"{nick} > Joined {target_channel}, but failed to write config.json (check permissions).")
return
self.send_message(channel, f"{nick} > Joining {target_channel}.")
async def handle_leave_channel(self, nick, channel, args):
"""Admin: !leave <#channel> / !part <#channel> (supports PM and channel invocation)."""
if not args:
self.send_message(channel, f"{nick} > Usage: !leave <#channel>")
return
target_channel = self._sanitize_channel_name(args[0])
if not target_channel:
self.send_message(channel, f"{nick} > Invalid channel. Usage: !leave <#channel>")
return
# Cancel any pending rejoin attempts and forget state.
if target_channel in self.rejoin_tasks:
try:
self.rejoin_tasks[target_channel].cancel()
except Exception:
pass
del self.rejoin_tasks[target_channel]
if target_channel in self.rejoin_attempts:
del self.rejoin_attempts[target_channel]
self.channels_joined.discard(target_channel)
# Update in-memory config so reconnects do not rejoin the channel.
channels = self._config_channels_list()
try:
while target_channel in channels:
channels.remove(target_channel)
except Exception:
pass
# Persist across restarts
if not self._persist_config():
self.send_message(channel, f"{nick} > Removed {target_channel} from my channel list, but failed to write config.json (check permissions).")
# Continue attempting PART anyway.
# Send PART even if we don't think we're in it (server will ignore or error).
if not self.send_raw(f"PART {target_channel} :Requested by {nick}"):
self.send_message(channel, f"{nick} > Couldn't send PART (not connected?).")
return
self.send_message(channel, f"{nick} > Leaving {target_channel}.")
async def handle_use(self, nick, channel, player, args):
"""Handle !use command"""
@@ -1212,7 +1428,8 @@ class DuckHuntBot:
# Check if admin wants to rearm all players
if target_nick.lower() == 'all':
rearmed_count = 0
for player_nick, player in self.db.players.items():
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
for player_nick, player in self.db.get_players_for_channel(channel_ctx).items():
if player.get('gun_confiscated', False):
player['gun_confiscated'] = False
self.levels.update_player_magazines(player)
@@ -1251,10 +1468,8 @@ class DuckHuntBot:
return
# Rearm the admin themselves (only in channels)
player = self.db.get_player(nick)
if player is None:
player = self.db.create_player(nick)
self.db.players[nick.lower()] = player
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
player = self.db.get_player(nick, channel_ctx)
player['gun_confiscated'] = False
@@ -1317,10 +1532,8 @@ class DuckHuntBot:
return
target = args[0].lower()
player = self.db.get_player(target)
if player is None:
player = self.db.create_player(target)
self.db.players[target] = player
channel_ctx = channel if isinstance(channel, str) and channel.startswith('#') else None
player = self.db.get_player(target, channel_ctx)
action_func(player)