#!/usr/bin/env python3 """ EXTREME TechIRCd Stress Testing Tool BRUTAL IRC server stress testing designed to break things """ import asyncio import random import string import time import logging import socket import sys from typing import List import threading class ExtremeIRCClient: def __init__(self, client_id: int): self.client_id = client_id self.nick = f"EXTREME{client_id:05d}" self.reader = None self.writer = None self.connected = False self.message_count = 0 async def connect(self): try: self.reader, self.writer = await asyncio.wait_for( asyncio.open_connection('localhost', 6667), timeout=5.0 ) self.connected = True return True except Exception as e: return False async def register(self): if not self.connected or self.writer is None: return False try: self.writer.write(f"NICK {self.nick}\r\n".encode()) self.writer.write(f"USER {self.nick} 0 * :Extreme Client {self.client_id}\r\n".encode()) await self.writer.drain() # Don't wait for response - BRUTAL mode return True except: return False async def spam_messages(self, count: int, delay: float = 0.001): """Send messages as fast as possible""" if not self.connected or self.writer is None: return messages = [ f"PRIVMSG #extreme :SPAM MESSAGE {i} FROM {self.nick} - " + "X" * 100 for i in range(count) ] try: for msg in messages: self.writer.write(f"{msg}\r\n".encode()) self.message_count += 1 if delay > 0: await asyncio.sleep(delay) await self.writer.drain() except: pass async def chaos_commands(self): """Send random commands rapidly""" commands = [ "JOIN #extreme", "PART #extreme", "JOIN #chaos1,#chaos2,#chaos3,#chaos4,#chaos5", "LIST", "WHO #extreme", "VERSION", "TIME", "ADMIN", "INFO", "LUSERS", f"NICK {self.nick}_CHAOS_{random.randint(1000,9999)}", "PING :test", ] try: if self.writer is not None: for _ in range(50): # 50 rapid commands cmd = random.choice(commands) self.writer.write(f"{cmd}\r\n".encode()) await asyncio.sleep(0.01) # 10ms between commands await self.writer.drain() except: pass async def disconnect(self): if self.writer: try: self.writer.write(b"QUIT :EXTREME TEST COMPLETE\r\n") await self.writer.drain() self.writer.close() await self.writer.wait_closed() except: pass self.connected = False class ExtremeStressTester: def __init__(self): self.clients = [] self.total_connections = 0 self.successful_connections = 0 self.total_messages = 0 self.start_time = None async def extreme_connection_bomb(self, count: int): """Connect as many clients as possible simultaneously""" print(f"๐Ÿ”ฅ EXTREME: Launching {count} connections simultaneously...") tasks = [] for i in range(count): client = ExtremeIRCClient(i) self.clients.append(client) tasks.append(self.connect_and_register(client)) # Launch ALL connections at once - BRUTAL results = await asyncio.gather(*tasks, return_exceptions=True) self.successful_connections = sum(1 for r in results if r is True) print(f"๐Ÿ’€ Connection bomb result: {self.successful_connections}/{count} successful") async def connect_and_register(self, client): try: if await client.connect(): await client.register() return True except: pass return False async def extreme_message_hurricane(self, messages_per_client: int = 200): """Send massive amounts of messages from all clients""" print(f"๐ŸŒช๏ธ EXTREME: Message hurricane - {messages_per_client} msgs per client...") connected_clients = [c for c in self.clients if c.connected] print(f"๐Ÿ’€ Using {len(connected_clients)} connected clients") tasks = [] for client in connected_clients: tasks.append(client.spam_messages(messages_per_client, delay=0.001)) # 1ms delay await asyncio.gather(*tasks, return_exceptions=True) total_msgs = sum(c.message_count for c in connected_clients) print(f"๐Ÿ’€ Hurricane complete: {total_msgs} total messages sent") async def extreme_chaos_mode(self): """Every client sends random commands rapidly""" print(f"๐Ÿ”ฅ EXTREME: CHAOS MODE ACTIVATED...") connected_clients = [c for c in self.clients if c.connected] tasks = [] for client in connected_clients: tasks.append(client.chaos_commands()) await asyncio.gather(*tasks, return_exceptions=True) print(f"๐Ÿ’€ Chaos mode complete") async def rapid_fire_connections(self, total: int, batch_size: int = 50): """Connect clients in rapid batches""" print(f"โšก EXTREME: Rapid fire - {total} clients in batches of {batch_size}") for batch_start in range(0, total, batch_size): batch_end = min(batch_start + batch_size, total) batch_clients = [] # Create batch for i in range(batch_start, batch_end): client = ExtremeIRCClient(i + 10000) # Offset IDs batch_clients.append(client) self.clients.append(client) # Connect batch simultaneously tasks = [self.connect_and_register(c) for c in batch_clients] results = await asyncio.gather(*tasks, return_exceptions=True) successful = sum(1 for r in results if r is True) print(f"๐Ÿ’€ Batch {batch_start//batch_size + 1}: {successful}/{len(batch_clients)} connected") # Tiny delay between batches await asyncio.sleep(0.1) async def cleanup(self): """Disconnect all clients""" print("๐Ÿงน Cleaning up connections...") tasks = [] for client in self.clients: if client.connected: tasks.append(client.disconnect()) if tasks: await asyncio.gather(*tasks, return_exceptions=True) print(f"๐Ÿ’€ Cleanup complete. Total clients created: {len(self.clients)}") async def run_extreme_test(): """Run the most brutal stress test possible""" print("๐Ÿ’€๐Ÿ’€๐Ÿ’€ EXTREME STRESS TEST STARTING ๐Ÿ’€๐Ÿ’€๐Ÿ’€") print("โš ๏ธ WARNING: This test is designed to break things!") print() tester = ExtremeStressTester() start_time = time.time() try: # Phase 1: Connection bomb await tester.extreme_connection_bomb(150) await asyncio.sleep(2) # Phase 2: Message hurricane await tester.extreme_message_hurricane(100) await asyncio.sleep(1) # Phase 3: Chaos mode await tester.extreme_chaos_mode() await asyncio.sleep(1) # Phase 4: More connections while under load await tester.rapid_fire_connections(100, 25) await asyncio.sleep(2) # Phase 5: Final message barrage await tester.extreme_message_hurricane(50) except KeyboardInterrupt: print("\n๐Ÿ›‘ Test interrupted by user") except Exception as e: print(f"\n๐Ÿ’ฅ Test crashed: {e}") finally: # Always cleanup await tester.cleanup() end_time = time.time() duration = end_time - start_time print(f"\n๐Ÿ’€ EXTREME STRESS TEST COMPLETE ๐Ÿ’€") print(f"Duration: {duration:.2f} seconds") print(f"Total clients: {len(tester.clients)}") print(f"Successful connections: {tester.successful_connections}") total_messages = sum(c.message_count for c in tester.clients) print(f"Total messages sent: {total_messages}") if duration > 0: print(f"Messages per second: {total_messages/duration:.2f}") if __name__ == "__main__": print("๐Ÿ’€ EXTREME STRESS TESTER ๐Ÿ’€") print("This will attempt to overwhelm the IRC server") print("Make sure TechIRCd is running!") print() try: asyncio.run(run_extreme_test()) except KeyboardInterrupt: print("\n๐Ÿ›‘ Aborted by user")