181 lines
3.8 KiB
Go
181 lines
3.8 KiB
Go
package main
|
|
|
|
import (
|
|
"log"
|
|
"net"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// WorkerPool manages goroutines for handling client connections
|
|
type WorkerPool struct {
|
|
workers int
|
|
jobs chan net.Conn
|
|
server *Server
|
|
wg sync.WaitGroup
|
|
shutdown chan bool
|
|
stats *PoolStats
|
|
}
|
|
|
|
type PoolStats struct {
|
|
ActiveWorkers int64
|
|
ProcessedJobs int64
|
|
QueuedJobs int64
|
|
ErrorCount int64
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewWorkerPool creates a new worker pool
|
|
func NewWorkerPool(workers int, server *Server) *WorkerPool {
|
|
if workers <= 0 {
|
|
workers = runtime.NumCPU() * 4 // Default: 4 workers per CPU core
|
|
}
|
|
|
|
return &WorkerPool{
|
|
workers: workers,
|
|
jobs: make(chan net.Conn, workers*10), // Buffer for connection queue
|
|
server: server,
|
|
shutdown: make(chan bool),
|
|
stats: &PoolStats{},
|
|
}
|
|
}
|
|
|
|
// Start initializes and starts the worker pool
|
|
func (wp *WorkerPool) Start() {
|
|
log.Printf("Starting worker pool with %d workers", wp.workers)
|
|
|
|
for i := 0; i < wp.workers; i++ {
|
|
wp.wg.Add(1)
|
|
go wp.worker(i)
|
|
}
|
|
|
|
// Start stats reporter
|
|
go wp.statsReporter()
|
|
}
|
|
|
|
// worker processes incoming connections
|
|
func (wp *WorkerPool) worker(id int) {
|
|
defer wp.wg.Done()
|
|
|
|
wp.updateActiveWorkers(1)
|
|
defer wp.updateActiveWorkers(-1)
|
|
|
|
log.Printf("Worker %d started", id)
|
|
|
|
for {
|
|
select {
|
|
case conn := <-wp.jobs:
|
|
wp.handleConnection(conn, id)
|
|
wp.updateProcessedJobs(1)
|
|
|
|
case <-wp.shutdown:
|
|
log.Printf("Worker %d shutting down", id)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleConnection processes a single client connection
|
|
func (wp *WorkerPool) handleConnection(conn net.Conn, workerID int) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("Worker %d recovered from panic: %v", workerID, r)
|
|
wp.updateErrorCount(1)
|
|
conn.Close()
|
|
}
|
|
}()
|
|
|
|
// Set connection timeouts
|
|
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
|
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
|
|
|
|
client := NewClient(conn, wp.server)
|
|
wp.server.AddClient(client)
|
|
|
|
log.Printf("Worker %d handling client from %s", workerID, conn.RemoteAddr())
|
|
|
|
// Handle client in this worker goroutine
|
|
client.Handle()
|
|
}
|
|
|
|
// Submit adds a connection to the worker pool queue
|
|
func (wp *WorkerPool) Submit(conn net.Conn) bool {
|
|
wp.updateQueuedJobs(1)
|
|
|
|
select {
|
|
case wp.jobs <- conn:
|
|
return true
|
|
default:
|
|
// Queue is full
|
|
wp.updateQueuedJobs(-1)
|
|
wp.updateErrorCount(1)
|
|
log.Printf("Worker pool queue full, rejecting connection from %s", conn.RemoteAddr())
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Shutdown gracefully stops the worker pool
|
|
func (wp *WorkerPool) Shutdown() {
|
|
log.Println("Shutting down worker pool...")
|
|
|
|
close(wp.shutdown)
|
|
close(wp.jobs)
|
|
|
|
// Wait for all workers to finish
|
|
wp.wg.Wait()
|
|
|
|
log.Println("Worker pool shutdown complete")
|
|
}
|
|
|
|
// Stats update methods
|
|
func (wp *WorkerPool) updateActiveWorkers(delta int64) {
|
|
wp.stats.mu.Lock()
|
|
wp.stats.ActiveWorkers += delta
|
|
wp.stats.mu.Unlock()
|
|
}
|
|
|
|
func (wp *WorkerPool) updateProcessedJobs(delta int64) {
|
|
wp.stats.mu.Lock()
|
|
wp.stats.ProcessedJobs += delta
|
|
wp.stats.QueuedJobs -= delta
|
|
wp.stats.mu.Unlock()
|
|
}
|
|
|
|
func (wp *WorkerPool) updateQueuedJobs(delta int64) {
|
|
wp.stats.mu.Lock()
|
|
wp.stats.QueuedJobs += delta
|
|
wp.stats.mu.Unlock()
|
|
}
|
|
|
|
func (wp *WorkerPool) updateErrorCount(delta int64) {
|
|
wp.stats.mu.Lock()
|
|
wp.stats.ErrorCount += delta
|
|
wp.stats.mu.Unlock()
|
|
}
|
|
|
|
// GetStats returns current pool statistics
|
|
func (wp *WorkerPool) GetStats() PoolStats {
|
|
wp.stats.mu.RLock()
|
|
defer wp.stats.mu.RUnlock()
|
|
return *wp.stats
|
|
}
|
|
|
|
// statsReporter periodically logs worker pool statistics
|
|
func (wp *WorkerPool) statsReporter() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
stats := wp.GetStats()
|
|
log.Printf("Worker Pool Stats - Active: %d, Processed: %d, Queued: %d, Errors: %d",
|
|
stats.ActiveWorkers, stats.ProcessedJobs, stats.QueuedJobs, stats.ErrorCount)
|
|
|
|
case <-wp.shutdown:
|
|
return
|
|
}
|
|
}
|
|
}
|