#!/usr/bin/env python3 """ TechIRCd Advanced Stress Tester =============================== Comprehensive IRC server stress testing tool with configurable behavior. """ import asyncio import random import time import logging from typing import List, Dict, Optional import json # ============================================================================= # CONFIGURATION - Edit these values to customize your stress test # ============================================================================= CONFIG = { # Server connection settings "SERVER": { "host": "localhost", "port": 6667, }, # Test parameters "TEST": { "max_clients": 300, # Number of concurrent clients "connect_delay": 0.05, # Seconds between connections "test_duration": 300, # Test duration in seconds "action_interval": 1.0, # Seconds between random actions "stats_interval": 10, # Seconds between stats reports }, # Client behavior probabilities (0.0 = never, 1.0 = always) "BEHAVIOR": { "join_channels": True, "send_messages": True, "use_new_commands": True, "change_nicks": True, "random_quit": False, # Action rates (probability per action cycle) "message_rate": 0.4, # Chance to send a message "channel_join_rate": 0.2, # Chance to join a channel "command_rate": 0.15, # Chance to send a command "nick_change_rate": 0.05, # Chance to change nick "quit_rate": 0.01, # Chance to quit and reconnect }, # IRC channels to use "CHANNELS": [ "#test", "#stress", "#general", "#random", "#chaos", "#lobby", "#gaming", "#tech", "#chat", "#help", "#dev", "#admin", "#support", "#lounge", "#public" ], # Random messages to send "MESSAGES": [ "Hello everyone!", "This is a stress test message", "How is everyone doing today?", "Testing server stability under load", "Random message from stress test client", "IRC is still the best chat protocol!", "TechIRCd is handling this load well", "Can you see this message?", "Stress testing in progress...", "Everything working fine here", "Anyone else here for the stress test?", "Server performance looking good!", "Testing new IRC commands", "This channel is quite active", "Load testing is important for stability" ], # IRC commands to test (including new ones!) "COMMANDS": [ "MOTD", "RULES", "MAP", "TIME", "VERSION", "LUSERS", "WHO #test", "WHOIS testuser", "LIST", "ADMIN", "INFO", "KNOCK #test", "SETNAME :New real name from stress test" ], # Logging configuration "LOGGING": { "level": "INFO", # DEBUG, INFO, WARNING, ERROR "show_irc_traffic": False, # Set to True to see all IRC messages } } # ============================================================================= # STRESS TESTING CODE - Don't modify unless you know what you're doing # ============================================================================= class StressTestStats: """Tracks statistics during stress testing""" def __init__(self): self.start_time = time.time() self.connected_clients = 0 self.messages_sent = 0 self.commands_sent = 0 self.channels_joined = 0 self.nick_changes = 0 self.errors = 0 self.reconnections = 0 def runtime(self) -> float: return time.time() - self.start_time def print_stats(self): runtime = self.runtime() print("\n" + "="*50) print("STRESS TEST STATISTICS") print("="*50) print(f"Runtime: {runtime:.1f}s") print(f"Connected Clients: {self.connected_clients}") print(f"Messages Sent: {self.messages_sent}") print(f"Commands Sent: {self.commands_sent}") print(f"Channels Joined: {self.channels_joined}") print(f"Nick Changes: {self.nick_changes}") print(f"Reconnections: {self.reconnections}") print(f"Errors: {self.errors}") if runtime > 0: print(f"Messages/sec: {self.messages_sent/runtime:.2f}") print(f"Commands/sec: {self.commands_sent/runtime:.2f}") print(f"Actions/sec: {(self.messages_sent + self.commands_sent)/runtime:.2f}") print("="*50) class IRCStressClient: """Individual IRC client for stress testing""" def __init__(self, client_id: int, stats: StressTestStats): self.client_id = client_id self.nick = f"StressUser{client_id}" self.user = f"stress{client_id}" self.realname = f"Stress Test Client {client_id}" self.channels = [] self.stats = stats self.reader = None self.writer = None self.connected = False self.registered = False self.running = True async def connect(self, host: str, port: int) -> bool: """Connect to IRC server""" try: self.reader, self.writer = await asyncio.open_connection(host, port) self.connected = True logging.debug(f"Client {self.client_id} connected to {host}:{port}") return True except Exception as e: logging.error(f"Client {self.client_id} connection failed: {e}") self.stats.errors += 1 return False async def register(self) -> bool: """Register with IRC server""" try: await self.send_command(f"NICK {self.nick}") await self.send_command(f"USER {self.user} 0 * :{self.realname}") self.registered = True self.stats.connected_clients += 1 logging.debug(f"Client {self.client_id} registered as {self.nick}") return True except Exception as e: logging.error(f"Client {self.client_id} registration failed: {e}") self.stats.errors += 1 return False async def send_command(self, command: str): """Send IRC command to server""" if not self.connected or not self.writer: return try: message = f"{command}\r\n" self.writer.write(message.encode()) await self.writer.drain() if CONFIG["LOGGING"]["show_irc_traffic"]: logging.debug(f"Client {self.client_id} >>> {command}") except Exception as e: logging.error(f"Client {self.client_id} send error: {e}") self.stats.errors += 1 async def read_messages(self): """Read and handle messages from server""" while self.running and self.connected: try: if not self.reader: break line = await self.reader.readline() if not line: break message = line.decode().strip() if not message: continue if CONFIG["LOGGING"]["show_irc_traffic"]: logging.debug(f"Client {self.client_id} <<< {message}") # Handle PING if message.startswith("PING"): pong = message.replace("PING", "PONG", 1) await self.send_command(pong) # Handle other server messages if needed except Exception as e: logging.error(f"Client {self.client_id} read error: {e}") self.stats.errors += 1 break async def join_random_channel(self): """Join a random channel""" if not CONFIG["BEHAVIOR"]["join_channels"]: return channel = random.choice(CONFIG["CHANNELS"]) if channel not in self.channels: await self.send_command(f"JOIN {channel}") self.channels.append(channel) self.stats.channels_joined += 1 logging.debug(f"Client {self.client_id} joined {channel}") async def send_random_message(self): """Send a random message to a random channel""" if not CONFIG["BEHAVIOR"]["send_messages"] or not self.channels: return channel = random.choice(self.channels) message = random.choice(CONFIG["MESSAGES"]) await self.send_command(f"PRIVMSG {channel} :{message}") self.stats.messages_sent += 1 logging.debug(f"Client {self.client_id} sent message to {channel}") async def send_random_command(self): """Send a random IRC command""" if not CONFIG["BEHAVIOR"]["use_new_commands"]: return command = random.choice(CONFIG["COMMANDS"]) await self.send_command(command) self.stats.commands_sent += 1 logging.debug(f"Client {self.client_id} sent command: {command}") async def change_nick(self): """Change nickname""" if not CONFIG["BEHAVIOR"]["change_nicks"]: return new_nick = f"User{self.client_id}_{random.randint(1, 9999)}" await self.send_command(f"NICK {new_nick}") self.nick = new_nick self.stats.nick_changes += 1 logging.debug(f"Client {self.client_id} changed nick to {new_nick}") async def random_quit_reconnect(self): """Randomly quit and reconnect""" if not CONFIG["BEHAVIOR"]["random_quit"]: return await self.send_command("QUIT :Reconnecting...") await self.disconnect() # Wait a moment then reconnect await asyncio.sleep(random.uniform(1, 3)) if await self.connect(CONFIG["SERVER"]["host"], CONFIG["SERVER"]["port"]): await self.register() self.stats.reconnections += 1 logging.debug(f"Client {self.client_id} reconnected") async def perform_random_action(self): """Perform a random IRC action""" if not self.registered: return action = random.random() if action < CONFIG["BEHAVIOR"]["message_rate"]: await self.send_random_message() elif action < CONFIG["BEHAVIOR"]["message_rate"] + CONFIG["BEHAVIOR"]["channel_join_rate"]: await self.join_random_channel() elif action < CONFIG["BEHAVIOR"]["message_rate"] + CONFIG["BEHAVIOR"]["channel_join_rate"] + CONFIG["BEHAVIOR"]["command_rate"]: await self.send_random_command() elif action < CONFIG["BEHAVIOR"]["message_rate"] + CONFIG["BEHAVIOR"]["channel_join_rate"] + CONFIG["BEHAVIOR"]["command_rate"] + CONFIG["BEHAVIOR"]["nick_change_rate"]: await self.change_nick() elif action < CONFIG["BEHAVIOR"]["message_rate"] + CONFIG["BEHAVIOR"]["channel_join_rate"] + CONFIG["BEHAVIOR"]["command_rate"] + CONFIG["BEHAVIOR"]["nick_change_rate"] + CONFIG["BEHAVIOR"]["quit_rate"]: await self.random_quit_reconnect() async def disconnect(self): """Disconnect from server""" self.running = False self.connected = False if self.writer: try: await self.send_command("QUIT :Stress test completed") self.writer.close() await self.writer.wait_closed() except: pass if self.registered: self.stats.connected_clients -= 1 self.registered = False class IRCStressTester: """Main stress testing coordinator""" def __init__(self): self.clients: List[IRCStressClient] = [] self.stats = StressTestStats() self.running = False # Setup logging log_level = getattr(logging, CONFIG["LOGGING"]["level"]) logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) async def create_client(self, client_id: int) -> Optional[IRCStressClient]: """Create and connect a new client""" client = IRCStressClient(client_id, self.stats) if await client.connect(CONFIG["SERVER"]["host"], CONFIG["SERVER"]["port"]): if await client.register(): # Start message reader asyncio.create_task(client.read_messages()) return client return None async def connect_all_clients(self): """Connect all clients with delay""" print(f"Connecting {CONFIG['TEST']['max_clients']} clients...") for i in range(CONFIG["TEST"]["max_clients"]): client = await self.create_client(i) if client: self.clients.append(client) # Join initial channel if CONFIG["BEHAVIOR"]["join_channels"]: await client.join_random_channel() # Progress reporting if (i + 1) % 10 == 0: print(f"Connected {i + 1}/{CONFIG['TEST']['max_clients']} clients") # Delay between connections if CONFIG["TEST"]["connect_delay"] > 0: await asyncio.sleep(CONFIG["TEST"]["connect_delay"]) print(f"All {len(self.clients)} clients connected!") async def run_activity_simulation(self): """Run the main activity simulation""" print("Starting activity simulation...") start_time = time.time() last_stats = time.time() while self.running and (time.time() - start_time) < CONFIG["TEST"]["test_duration"]: # Perform random actions active_clients = [c for c in self.clients if c.registered] if active_clients: # Select random clients to perform actions num_actions = random.randint(1, len(active_clients) // 4 + 1) selected_clients = random.sample(active_clients, min(num_actions, len(active_clients))) # Perform actions concurrently tasks = [client.perform_random_action() for client in selected_clients] await asyncio.gather(*tasks, return_exceptions=True) # Print stats periodically if time.time() - last_stats >= CONFIG["TEST"]["stats_interval"]: self.stats.print_stats() last_stats = time.time() # Wait before next action cycle await asyncio.sleep(CONFIG["TEST"]["action_interval"]) async def shutdown_all_clients(self): """Gracefully disconnect all clients""" print("Shutting down all clients...") disconnect_tasks = [] for client in self.clients: disconnect_tasks.append(client.disconnect()) # Wait for all disconnections await asyncio.gather(*disconnect_tasks, return_exceptions=True) print("All clients disconnected.") async def run_stress_test(self): """Run the complete stress test""" print("="*60) print("TECHIRCD ADVANCED STRESS TESTER") print("="*60) print(f"Target: {CONFIG['SERVER']['host']}:{CONFIG['SERVER']['port']}") print(f"Clients: {CONFIG['TEST']['max_clients']}") print(f"Duration: {CONFIG['TEST']['test_duration']}s") print(f"Channels: {len(CONFIG['CHANNELS'])}") print(f"Commands: {len(CONFIG['COMMANDS'])}") print("="*60) self.running = True try: # Connect all clients await self.connect_all_clients() # Run activity simulation await self.run_activity_simulation() except KeyboardInterrupt: print("\nTest interrupted by user!") except Exception as e: print(f"\nTest failed with error: {e}") finally: self.running = False await self.shutdown_all_clients() # Final stats print("\nFINAL RESULTS:") self.stats.print_stats() async def main(): """Main entry point""" stress_tester = IRCStressTester() await stress_tester.run_stress_test() if __name__ == "__main__": print("Starting TechIRCd Advanced Stress Tester...") print("Press Ctrl+C to stop the test early.\n") try: asyncio.run(main()) except KeyboardInterrupt: print("\nTest terminated by user.") except Exception as e: print(f"\nFatal error: {e}")