Files
techircd/worker_pool.go

181 lines
3.8 KiB
Go

package main
import (
"log"
"net"
"runtime"
"sync"
"time"
)
// WorkerPool manages goroutines for handling client connections
type WorkerPool struct {
workers int
jobs chan net.Conn
server *Server
wg sync.WaitGroup
shutdown chan bool
stats *PoolStats
}
type PoolStats struct {
ActiveWorkers int64
ProcessedJobs int64
QueuedJobs int64
ErrorCount int64
mu sync.RWMutex
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workers int, server *Server) *WorkerPool {
if workers <= 0 {
workers = runtime.NumCPU() * 4 // Default: 4 workers per CPU core
}
return &WorkerPool{
workers: workers,
jobs: make(chan net.Conn, workers*10), // Buffer for connection queue
server: server,
shutdown: make(chan bool),
stats: &PoolStats{},
}
}
// Start initializes and starts the worker pool
func (wp *WorkerPool) Start() {
log.Printf("Starting worker pool with %d workers", wp.workers)
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
// Start stats reporter
go wp.statsReporter()
}
// worker processes incoming connections
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
wp.updateActiveWorkers(1)
defer wp.updateActiveWorkers(-1)
log.Printf("Worker %d started", id)
for {
select {
case conn := <-wp.jobs:
wp.handleConnection(conn, id)
wp.updateProcessedJobs(1)
case <-wp.shutdown:
log.Printf("Worker %d shutting down", id)
return
}
}
}
// handleConnection processes a single client connection
func (wp *WorkerPool) handleConnection(conn net.Conn, workerID int) {
defer func() {
if r := recover(); r != nil {
log.Printf("Worker %d recovered from panic: %v", workerID, r)
wp.updateErrorCount(1)
conn.Close()
}
}()
// Set connection timeouts
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
client := NewClient(conn, wp.server)
wp.server.AddClient(client)
log.Printf("Worker %d handling client from %s", workerID, conn.RemoteAddr())
// Handle client in this worker goroutine
client.Handle()
}
// Submit adds a connection to the worker pool queue
func (wp *WorkerPool) Submit(conn net.Conn) bool {
wp.updateQueuedJobs(1)
select {
case wp.jobs <- conn:
return true
default:
// Queue is full
wp.updateQueuedJobs(-1)
wp.updateErrorCount(1)
log.Printf("Worker pool queue full, rejecting connection from %s", conn.RemoteAddr())
return false
}
}
// Shutdown gracefully stops the worker pool
func (wp *WorkerPool) Shutdown() {
log.Println("Shutting down worker pool...")
close(wp.shutdown)
close(wp.jobs)
// Wait for all workers to finish
wp.wg.Wait()
log.Println("Worker pool shutdown complete")
}
// Stats update methods
func (wp *WorkerPool) updateActiveWorkers(delta int64) {
wp.stats.mu.Lock()
wp.stats.ActiveWorkers += delta
wp.stats.mu.Unlock()
}
func (wp *WorkerPool) updateProcessedJobs(delta int64) {
wp.stats.mu.Lock()
wp.stats.ProcessedJobs += delta
wp.stats.QueuedJobs -= delta
wp.stats.mu.Unlock()
}
func (wp *WorkerPool) updateQueuedJobs(delta int64) {
wp.stats.mu.Lock()
wp.stats.QueuedJobs += delta
wp.stats.mu.Unlock()
}
func (wp *WorkerPool) updateErrorCount(delta int64) {
wp.stats.mu.Lock()
wp.stats.ErrorCount += delta
wp.stats.mu.Unlock()
}
// GetStats returns current pool statistics
func (wp *WorkerPool) GetStats() PoolStats {
wp.stats.mu.RLock()
defer wp.stats.mu.RUnlock()
return *wp.stats
}
// statsReporter periodically logs worker pool statistics
func (wp *WorkerPool) statsReporter() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := wp.GetStats()
log.Printf("Worker Pool Stats - Active: %d, Processed: %d, Queued: %d, Errors: %d",
stats.ActiveWorkers, stats.ProcessedJobs, stats.QueuedJobs, stats.ErrorCount)
case <-wp.shutdown:
return
}
}
}