mirror of
https://github.com/fatedier/frp.git
synced 2026-03-11 12:29:14 +08:00
* server/group: refactor group package with shared abstractions and fix concurrency issues Extract common patterns into reusable components: - groupRegistry[G]: generic concurrent map for group lifecycle management - baseGroup: shared plumbing for listener-based groups (TCP, HTTPS, TCPMux) - Listener: unified virtual listener replacing 3 identical implementations Fix concurrency issues: - Stale-pointer race: isCurrent check + errGroupStale + controller retry loops - Worker generation safety: pass realLn and acceptCh as params instead of reading mutable fields - Connection leak: close conn on worker panic recovery path - ABBA deadlock in HTTP UnRegister: consistent lock ordering (group.mu -> registry.mu) - Round-robin overflow in HTTPGroup: use unsigned modulo Add unit tests (17 tests) for registry, listener, and baseGroup. Add TCPMux group load balancing e2e test. * server/group: replace tautological assertion with require.NotPanics * server/group: remove blank line between doc comment and type declaration
78 lines
1.9 KiB
Go
78 lines
1.9 KiB
Go
package group
|
|
|
|
import (
|
|
"net"
|
|
"sync"
|
|
|
|
gerr "github.com/fatedier/golib/errors"
|
|
)
|
|
|
|
// baseGroup contains the shared plumbing for listener-based groups
|
|
// (TCP, HTTPS, TCPMux). Each concrete group embeds this and provides
|
|
// its own Listen method with protocol-specific validation.
|
|
type baseGroup struct {
|
|
group string
|
|
groupKey string
|
|
|
|
acceptCh chan net.Conn
|
|
realLn net.Listener
|
|
lns []*Listener
|
|
mu sync.Mutex
|
|
cleanupFn func()
|
|
}
|
|
|
|
// initBase resets the baseGroup for a fresh listen cycle.
|
|
// Must be called under mu when len(lns) == 0.
|
|
func (bg *baseGroup) initBase(group, groupKey string, realLn net.Listener, cleanupFn func()) {
|
|
bg.group = group
|
|
bg.groupKey = groupKey
|
|
bg.realLn = realLn
|
|
bg.acceptCh = make(chan net.Conn)
|
|
bg.cleanupFn = cleanupFn
|
|
}
|
|
|
|
// worker reads from the real listener and fans out to acceptCh.
|
|
// The parameters are captured at creation time so that the worker is
|
|
// bound to a specific listen cycle and cannot observe a later initBase.
|
|
func (bg *baseGroup) worker(realLn net.Listener, acceptCh chan<- net.Conn) {
|
|
for {
|
|
c, err := realLn.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = gerr.PanicToError(func() {
|
|
acceptCh <- c
|
|
})
|
|
if err != nil {
|
|
c.Close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// newListener creates a new Listener wired to this baseGroup.
|
|
// Must be called under mu.
|
|
func (bg *baseGroup) newListener(addr net.Addr) *Listener {
|
|
ln := newListener(bg.acceptCh, addr, bg.closeListener)
|
|
bg.lns = append(bg.lns, ln)
|
|
return ln
|
|
}
|
|
|
|
// closeListener removes ln from the list. When the last listener is removed,
|
|
// it closes acceptCh, closes the real listener, and calls cleanupFn.
|
|
func (bg *baseGroup) closeListener(ln *Listener) {
|
|
bg.mu.Lock()
|
|
defer bg.mu.Unlock()
|
|
for i, l := range bg.lns {
|
|
if l == ln {
|
|
bg.lns = append(bg.lns[:i], bg.lns[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
if len(bg.lns) == 0 {
|
|
close(bg.acceptCh)
|
|
bg.realLn.Close()
|
|
bg.cleanupFn()
|
|
}
|
|
}
|