155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import socket
|
|
import time
|
|
import threading
|
|
|
|
def connect_client(nickname):
|
|
"""Connect a client and return the socket"""
|
|
try:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect(('localhost', 6667))
|
|
|
|
# Send initial IRC commands
|
|
sock.send(f"NICK {nickname}\r\n".encode())
|
|
sock.send(f"USER {nickname} 0 * :Test User\r\n".encode())
|
|
|
|
# Wait for welcome messages
|
|
time.sleep(0.5)
|
|
|
|
return sock
|
|
except Exception as e:
|
|
print(f"Error connecting {nickname}: {e}")
|
|
return None
|
|
|
|
def read_responses(sock, name, duration=2):
|
|
"""Read responses from server for a duration"""
|
|
try:
|
|
sock.settimeout(0.1)
|
|
responses = []
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < duration:
|
|
try:
|
|
data = sock.recv(4096).decode()
|
|
if data:
|
|
responses.append(data.strip())
|
|
print(f"[{name}] {data.strip()}")
|
|
except socket.timeout:
|
|
continue
|
|
except:
|
|
break
|
|
|
|
return responses
|
|
except Exception as e:
|
|
print(f"Error reading from {name}: {e}")
|
|
return []
|
|
|
|
def test_gline_enforcement():
|
|
print("\n=== TESTING GLINE ENFORCEMENT ===")
|
|
|
|
# Connect admin client
|
|
admin = connect_client("AdminUser")
|
|
if not admin:
|
|
print("Failed to connect admin")
|
|
return
|
|
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Give admin operator privileges (OPER)
|
|
admin.send("OPER admin adminpass\r\n".encode())
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Connect a test user first
|
|
print("\n1. Connecting test user before GLINE...")
|
|
test_user = connect_client("TestUser")
|
|
if test_user:
|
|
read_responses(test_user, "TestUser", 1)
|
|
print(" ✓ TestUser connected successfully")
|
|
|
|
# Issue GLINE command
|
|
print("\n2. Issuing GLINE for TestUser...")
|
|
admin.send("GLINE TestUser 1h Testing GLINE enforcement\r\n".encode())
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Try to send a message as the existing user
|
|
print("\n3. TestUser trying to send message after GLINE...")
|
|
if test_user:
|
|
test_user.send("PRIVMSG #test :This should work, GLINE only affects new connections\r\n".encode())
|
|
read_responses(test_user, "TestUser", 1)
|
|
|
|
# Try to connect a new user with same nick pattern
|
|
print("\n4. Trying to connect new TestUser after GLINE...")
|
|
new_test_user = connect_client("TestUser2")
|
|
if new_test_user:
|
|
read_responses(new_test_user, "TestUser2", 2)
|
|
new_test_user.close()
|
|
|
|
# Cleanup
|
|
if test_user:
|
|
test_user.close()
|
|
admin.close()
|
|
|
|
def test_shun_enforcement():
|
|
print("\n\n=== TESTING SHUN ENFORCEMENT ===")
|
|
|
|
# Connect admin client
|
|
admin = connect_client("AdminUser")
|
|
if not admin:
|
|
print("Failed to connect admin")
|
|
return
|
|
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Give admin operator privileges
|
|
admin.send("OPER admin adminpass\r\n".encode())
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Connect test user
|
|
print("\n1. Connecting test user...")
|
|
test_user = connect_client("ShunUser")
|
|
if not test_user:
|
|
print("Failed to connect test user")
|
|
admin.close()
|
|
return
|
|
|
|
read_responses(test_user, "ShunUser", 1)
|
|
|
|
# Test user sends a message before SHUN
|
|
print("\n2. ShunUser sending message before SHUN...")
|
|
test_user.send("PRIVMSG #test :This message should work\r\n".encode())
|
|
read_responses(test_user, "ShunUser", 1)
|
|
|
|
# Issue SHUN command
|
|
print("\n3. Issuing SHUN for ShunUser...")
|
|
admin.send("SHUN ShunUser 1h Testing SHUN enforcement\r\n".encode())
|
|
read_responses(admin, "Admin", 1)
|
|
|
|
# Test user tries to send message after SHUN
|
|
print("\n4. ShunUser trying to send message after SHUN (should be ignored)...")
|
|
test_user.send("PRIVMSG #test :This message should be silently ignored\r\n".encode())
|
|
read_responses(test_user, "ShunUser", 2)
|
|
|
|
# Test user tries other commands
|
|
print("\n5. ShunUser trying other commands after SHUN...")
|
|
test_user.send("JOIN #testchan\r\n".encode())
|
|
read_responses(test_user, "ShunUser", 1)
|
|
|
|
test_user.send("NOTICE #test :This notice should also be ignored\r\n".encode())
|
|
read_responses(test_user, "ShunUser", 1)
|
|
|
|
# Cleanup
|
|
test_user.close()
|
|
admin.close()
|
|
|
|
if __name__ == "__main__":
|
|
print("Testing TechIRCd Ban Enforcement")
|
|
print("Make sure TechIRCd server is running on localhost:6667")
|
|
|
|
time.sleep(1)
|
|
|
|
test_gline_enforcement()
|
|
test_shun_enforcement()
|
|
|
|
print("\n=== TEST COMPLETE ===")
|