package main import ( "log" "net" "runtime" "sync" "time" ) // WorkerPool manages goroutines for handling client connections type WorkerPool struct { workers int jobs chan net.Conn server *Server wg sync.WaitGroup shutdown chan bool stats *PoolStats } type PoolStats struct { ActiveWorkers int64 ProcessedJobs int64 QueuedJobs int64 ErrorCount int64 mu sync.RWMutex } // NewWorkerPool creates a new worker pool func NewWorkerPool(workers int, server *Server) *WorkerPool { if workers <= 0 { workers = runtime.NumCPU() * 4 // Default: 4 workers per CPU core } return &WorkerPool{ workers: workers, jobs: make(chan net.Conn, workers*10), // Buffer for connection queue server: server, shutdown: make(chan bool), stats: &PoolStats{}, } } // Start initializes and starts the worker pool func (wp *WorkerPool) Start() { log.Printf("Starting worker pool with %d workers", wp.workers) for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go wp.worker(i) } // Start stats reporter go wp.statsReporter() } // worker processes incoming connections func (wp *WorkerPool) worker(id int) { defer wp.wg.Done() wp.updateActiveWorkers(1) defer wp.updateActiveWorkers(-1) log.Printf("Worker %d started", id) for { select { case conn := <-wp.jobs: wp.handleConnection(conn, id) wp.updateProcessedJobs(1) case <-wp.shutdown: log.Printf("Worker %d shutting down", id) return } } } // handleConnection processes a single client connection func (wp *WorkerPool) handleConnection(conn net.Conn, workerID int) { defer func() { if r := recover(); r != nil { log.Printf("Worker %d recovered from panic: %v", workerID, r) wp.updateErrorCount(1) conn.Close() } }() // Set connection timeouts conn.SetReadDeadline(time.Now().Add(30 * time.Second)) conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) client := NewClient(conn, wp.server) wp.server.AddClient(client) log.Printf("Worker %d handling client from %s", workerID, conn.RemoteAddr()) // Handle client in this worker goroutine client.Handle() } // Submit adds a connection to the worker pool queue func (wp *WorkerPool) Submit(conn net.Conn) bool { wp.updateQueuedJobs(1) select { case wp.jobs <- conn: return true default: // Queue is full wp.updateQueuedJobs(-1) wp.updateErrorCount(1) log.Printf("Worker pool queue full, rejecting connection from %s", conn.RemoteAddr()) return false } } // Shutdown gracefully stops the worker pool func (wp *WorkerPool) Shutdown() { log.Println("Shutting down worker pool...") close(wp.shutdown) close(wp.jobs) // Wait for all workers to finish wp.wg.Wait() log.Println("Worker pool shutdown complete") } // Stats update methods func (wp *WorkerPool) updateActiveWorkers(delta int64) { wp.stats.mu.Lock() wp.stats.ActiveWorkers += delta wp.stats.mu.Unlock() } func (wp *WorkerPool) updateProcessedJobs(delta int64) { wp.stats.mu.Lock() wp.stats.ProcessedJobs += delta wp.stats.QueuedJobs -= delta wp.stats.mu.Unlock() } func (wp *WorkerPool) updateQueuedJobs(delta int64) { wp.stats.mu.Lock() wp.stats.QueuedJobs += delta wp.stats.mu.Unlock() } func (wp *WorkerPool) updateErrorCount(delta int64) { wp.stats.mu.Lock() wp.stats.ErrorCount += delta wp.stats.mu.Unlock() } // GetStats returns current pool statistics func (wp *WorkerPool) GetStats() PoolStats { wp.stats.mu.RLock() defer wp.stats.mu.RUnlock() return *wp.stats } // statsReporter periodically logs worker pool statistics func (wp *WorkerPool) statsReporter() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: stats := wp.GetStats() log.Printf("Worker Pool Stats - Active: %d, Processed: %d, Queued: %d, Errors: %d", stats.ActiveWorkers, stats.ProcessedJobs, stats.QueuedJobs, stats.ErrorCount) case <-wp.shutdown: return } } }