Files
techircd/stress_test.py

454 lines
16 KiB
Python

#!/usr/bin/env python3
"""
TechIRCd Stress Testing Tool
Advanced IRC server stress testing with configurable scenarios
"""
import asyncio
import json
import random
import string
import time
import logging
import argparse
from typing import List, Dict, Any
import socket
import ssl
class IRCClient:
def __init__(self, config: Dict[str, Any], client_id: int):
self.config = config
self.client_id = client_id
self.nick = f"{config['nick_prefix']}{client_id:04d}"
self.user = f"user{client_id}"
self.realname = f"Stress Test Client {client_id}"
self.reader = None
self.writer = None
self.connected = False
self.registered = False
self.channels = []
self.message_count = 0
self.start_time = None
async def connect(self):
"""Connect to IRC server"""
try:
if self.config.get('ssl', False):
context = ssl.create_default_context()
self.reader, self.writer = await asyncio.open_connection(
self.config['host'],
self.config.get('ssl_port', 6697),
ssl=context
)
else:
self.reader, self.writer = await asyncio.open_connection(
self.config['host'],
self.config['port']
)
self.connected = True
self.start_time = time.time()
# Start registration
await self.send(f"NICK {self.nick}")
await self.send(f"USER {self.user} 0 * :{self.realname}")
# Start message handler
asyncio.create_task(self.message_handler())
return True
except Exception as e:
logging.error(f"Client {self.client_id} connection failed: {e}")
return False
async def send(self, message: str):
"""Send message to server"""
if self.writer and not self.writer.is_closing():
try:
self.writer.write(f"{message}\r\n".encode())
await self.writer.drain()
logging.debug(f"Client {self.client_id} sent: {message}")
except Exception as e:
logging.error(f"Client {self.client_id} send error: {e}")
async def message_handler(self):
"""Handle incoming messages"""
try:
while self.connected and self.reader:
line = await self.reader.readline()
if not line:
break
message = line.decode().strip()
if not message:
continue
logging.debug(f"Client {self.client_id} received: {message}")
await self.handle_message(message)
except Exception as e:
logging.error(f"Client {self.client_id} message handler error: {e}")
finally:
self.connected = False
async def handle_message(self, message: str):
"""Process incoming IRC messages"""
parts = message.split()
if len(parts) < 2:
return
# Handle PING
if parts[0] == "PING":
await self.send(f"PONG {parts[1]}")
return
# Handle numeric responses
if len(parts) >= 2 and parts[1].isdigit():
numeric = int(parts[1])
# Welcome message - registration complete
if numeric == 1: # RPL_WELCOME
self.registered = True
logging.info(f"Client {self.client_id} ({self.nick}) registered successfully")
# Auto-join channels if configured
for channel in self.config.get('auto_join_channels', []):
await self.join_channel(channel)
async def join_channel(self, channel: str):
"""Join a channel"""
await self.send(f"JOIN {channel}")
if channel not in self.channels:
self.channels.append(channel)
async def send_privmsg(self, target: str, message: str):
"""Send private message or channel message"""
await self.send(f"PRIVMSG {target} :{message}")
self.message_count += 1
async def disconnect(self):
"""Disconnect from server"""
self.connected = False
if self.writer and not self.writer.is_closing():
await self.send("QUIT :Stress test complete")
self.writer.close()
await self.writer.wait_closed()
class StressTest:
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.clients: List[IRCClient] = []
self.start_time = None
self.stats = {
'connected': 0,
'registered': 0,
'messages_sent': 0,
'errors': 0
}
# Setup logging
log_level = getattr(logging, self.config.get('log_level', 'INFO').upper())
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('stress_test.log'),
logging.StreamHandler()
]
)
async def run_scenario(self, scenario: Dict[str, Any]):
"""Run a specific test scenario"""
scenario_name = scenario['name']
client_count = scenario['client_count']
duration = scenario.get('duration', 60)
logging.info(f"Starting scenario: {scenario_name}")
logging.info(f"Clients: {client_count}, Duration: {duration}s")
# Create and connect clients
if scenario.get('connect_gradually', False):
await self.gradual_connect(client_count, scenario.get('connect_delay', 0.1))
else:
await self.mass_connect(client_count)
# Run scenario activities
await self.run_activities(scenario, duration)
# Collect stats
await self.collect_stats()
# Disconnect clients
if scenario.get('disconnect_gradually', False):
await self.gradual_disconnect(scenario.get('disconnect_delay', 0.1))
else:
await self.mass_disconnect()
logging.info(f"Scenario {scenario_name} completed")
async def mass_connect(self, count: int):
"""Connect many clients simultaneously"""
logging.info(f"Mass connecting {count} clients...")
tasks = []
for i in range(count):
client = IRCClient(self.config['server'], i + 1)
self.clients.append(client)
tasks.append(client.connect())
results = await asyncio.gather(*tasks, return_exceptions=True)
connected = sum(1 for r in results if r is True)
self.stats['connected'] = connected
logging.info(f"Connected {connected}/{count} clients")
# Wait for registration
await asyncio.sleep(2)
registered = sum(1 for c in self.clients if c.registered)
self.stats['registered'] = registered
logging.info(f"Registered {registered}/{connected} clients")
async def gradual_connect(self, count: int, delay: float):
"""Connect clients gradually with delay"""
logging.info(f"Gradually connecting {count} clients with {delay}s delay...")
for i in range(count):
client = IRCClient(self.config['server'], i + 1)
self.clients.append(client)
success = await client.connect()
if success:
self.stats['connected'] += 1
if delay > 0:
await asyncio.sleep(delay)
# Wait for registration
await asyncio.sleep(2)
registered = sum(1 for c in self.clients if c.registered)
self.stats['registered'] = registered
logging.info(f"Registered {registered}/{self.stats['connected']} clients")
async def run_activities(self, scenario: Dict[str, Any], duration: int):
"""Run scenario activities for specified duration"""
activities = scenario.get('activities', [])
if not activities:
logging.info(f"No activities specified, waiting {duration} seconds...")
await asyncio.sleep(duration)
return
end_time = time.time() + duration
while time.time() < end_time:
for activity in activities:
if time.time() >= end_time:
break
await self.run_activity(activity)
# Delay between activities
delay = activity.get('delay', 1.0)
await asyncio.sleep(delay)
async def run_activity(self, activity: Dict[str, Any]):
"""Run a single activity"""
activity_type = activity['type']
if activity_type == 'channel_flood':
await self.channel_flood_activity(activity)
elif activity_type == 'private_flood':
await self.private_flood_activity(activity)
elif activity_type == 'join_part_spam':
await self.join_part_spam_activity(activity)
elif activity_type == 'nick_change_spam':
await self.nick_change_spam_activity(activity)
elif activity_type == 'random_commands':
await self.random_commands_activity(activity)
else:
logging.warning(f"Unknown activity type: {activity_type}")
async def channel_flood_activity(self, activity: Dict[str, Any]):
"""Flood channels with messages"""
message_count = activity.get('message_count', 10)
channel = activity.get('channel', '#test')
registered_clients = [c for c in self.clients if c.registered]
if not registered_clients:
return
tasks = []
for _ in range(message_count):
client = random.choice(registered_clients)
message = self.generate_random_message()
tasks.append(client.send_privmsg(channel, message))
await asyncio.gather(*tasks, return_exceptions=True)
self.stats['messages_sent'] += len(tasks)
async def private_flood_activity(self, activity: Dict[str, Any]):
"""Flood with private messages"""
message_count = activity.get('message_count', 10)
registered_clients = [c for c in self.clients if c.registered]
if len(registered_clients) < 2:
return
tasks = []
for _ in range(message_count):
sender = random.choice(registered_clients)
target = random.choice(registered_clients)
if sender != target:
message = self.generate_random_message()
tasks.append(sender.send_privmsg(target.nick, message))
await asyncio.gather(*tasks, return_exceptions=True)
self.stats['messages_sent'] += len(tasks)
async def join_part_spam_activity(self, activity: Dict[str, Any]):
"""Spam JOIN/PART commands"""
iterations = activity.get('iterations', 5)
channels = activity.get('channels', ['#spam1', '#spam2', '#spam3'])
registered_clients = [c for c in self.clients if c.registered]
if not registered_clients:
return
for _ in range(iterations):
client = random.choice(registered_clients)
channel = random.choice(channels)
await client.join_channel(channel)
await asyncio.sleep(0.1)
await client.send(f"PART {channel} :Spam test")
async def nick_change_spam_activity(self, activity: Dict[str, Any]):
"""Spam NICK changes"""
iterations = activity.get('iterations', 5)
registered_clients = [c for c in self.clients if c.registered]
if not registered_clients:
return
for _ in range(iterations):
client = random.choice(registered_clients)
new_nick = f"{client.nick}_{''.join(random.choices(string.ascii_lowercase, k=3))}"
await client.send(f"NICK {new_nick}")
await asyncio.sleep(0.1)
async def random_commands_activity(self, activity: Dict[str, Any]):
"""Send random IRC commands"""
command_count = activity.get('command_count', 10)
commands = activity.get('commands', ['WHO #test', 'WHOIS randomnick', 'LIST', 'VERSION'])
registered_clients = [c for c in self.clients if c.registered]
if not registered_clients:
return
for _ in range(command_count):
client = random.choice(registered_clients)
command = random.choice(commands)
await client.send(command)
await asyncio.sleep(0.1)
def generate_random_message(self) -> str:
"""Generate random message content"""
words = ['hello', 'world', 'test', 'stress', 'irc', 'server', 'message', 'random', 'data']
return ' '.join(random.choices(words, k=random.randint(1, 8)))
async def collect_stats(self):
"""Collect and display statistics"""
connected = sum(1 for c in self.clients if c.connected)
registered = sum(1 for c in self.clients if c.registered)
total_messages = sum(c.message_count for c in self.clients)
self.stats.update({
'connected': connected,
'registered': registered,
'messages_sent': total_messages
})
logging.info(f"Stats: {connected} connected, {registered} registered, {total_messages} messages sent")
async def mass_disconnect(self):
"""Disconnect all clients simultaneously"""
logging.info("Disconnecting all clients...")
tasks = [client.disconnect() for client in self.clients]
await asyncio.gather(*tasks, return_exceptions=True)
self.clients.clear()
async def gradual_disconnect(self, delay: float):
"""Disconnect clients gradually"""
logging.info(f"Gradually disconnecting clients with {delay}s delay...")
for client in self.clients:
await client.disconnect()
if delay > 0:
await asyncio.sleep(delay)
self.clients.clear()
async def run_all_scenarios(self):
"""Run all configured scenarios"""
self.start_time = time.time()
for scenario in self.config['scenarios']:
try:
await self.run_scenario(scenario)
# Delay between scenarios
scenario_delay = scenario.get('delay_after', 2)
if scenario_delay > 0:
logging.info(f"Waiting {scenario_delay}s before next scenario...")
await asyncio.sleep(scenario_delay)
except Exception as e:
logging.error(f"Scenario {scenario['name']} failed: {e}")
self.stats['errors'] += 1
total_time = time.time() - self.start_time
logging.info(f"All scenarios completed in {total_time:.2f} seconds")
logging.info(f"Final stats: {self.stats}")
def main():
parser = argparse.ArgumentParser(description='TechIRCd Stress Testing Tool')
parser.add_argument('--config', '-c', default='stress_config.json',
help='Configuration file (default: stress_config.json)')
parser.add_argument('--scenario', '-s', help='Run specific scenario only')
args = parser.parse_args()
try:
stress_test = StressTest(args.config)
if args.scenario:
# Run specific scenario
scenario = next((s for s in stress_test.config['scenarios'] if s['name'] == args.scenario), None)
if scenario:
asyncio.run(stress_test.run_scenario(scenario))
else:
print(f"Scenario '{args.scenario}' not found")
return 1
else:
# Run all scenarios
asyncio.run(stress_test.run_all_scenarios())
return 0
except FileNotFoundError:
print(f"Configuration file '{args.config}' not found")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
exit(main())