mirror of
https://github.com/fatedier/frp.git
synced 2026-03-20 16:59:18 +08:00
server/group: refactor with shared abstractions and fix concurrency issues (#5222)
* server/group: refactor group package with shared abstractions and fix concurrency issues Extract common patterns into reusable components: - groupRegistry[G]: generic concurrent map for group lifecycle management - baseGroup: shared plumbing for listener-based groups (TCP, HTTPS, TCPMux) - Listener: unified virtual listener replacing 3 identical implementations Fix concurrency issues: - Stale-pointer race: isCurrent check + errGroupStale + controller retry loops - Worker generation safety: pass realLn and acceptCh as params instead of reading mutable fields - Connection leak: close conn on worker panic recovery path - ABBA deadlock in HTTP UnRegister: consistent lock ordering (group.mu -> registry.mu) - Round-robin overflow in HTTPGroup: use unsigned modulo Add unit tests (17 tests) for registry, listener, and baseGroup. Add TCPMux group load balancing e2e test. * server/group: replace tautological assertion with require.NotPanics * server/group: remove blank line between doc comment and type declaration
This commit is contained in:
169
server/group/base_test.go
Normal file
169
server/group/base_test.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// fakeLn is a controllable net.Listener for tests.
|
||||
type fakeLn struct {
|
||||
connCh chan net.Conn
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func newFakeLn() *fakeLn {
|
||||
return &fakeLn{
|
||||
connCh: make(chan net.Conn, 8),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeLn) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case c := <-f.connCh:
|
||||
return c, nil
|
||||
case <-f.closed:
|
||||
return nil, net.ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeLn) Close() error {
|
||||
f.once.Do(func() { close(f.closed) })
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLn) Addr() net.Addr { return fakeAddr("127.0.0.1:9999") }
|
||||
|
||||
func (f *fakeLn) inject(c net.Conn) {
|
||||
select {
|
||||
case f.connCh <- c:
|
||||
case <-f.closed:
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerFanOut(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
select {
|
||||
case got := <-bg.acceptCh:
|
||||
assert.Equal(t, c1, got)
|
||||
got.Close()
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for connection on acceptCh")
|
||||
}
|
||||
|
||||
fl.Close()
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerStopsOnListenerClose(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
bg.worker(fl, bg.acceptCh)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
fl.Close()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("worker did not stop after listener close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerClosesConnOnClosedChannel(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
// Close acceptCh before worker sends.
|
||||
close(bg.acceptCh)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
bg.worker(fl, bg.acceptCh)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("worker did not stop after panic recovery")
|
||||
}
|
||||
|
||||
// c1 should have been closed by worker's panic recovery path.
|
||||
buf := make([]byte, 1)
|
||||
_, err := c1.Read(buf)
|
||||
assert.Error(t, err, "connection should be closed by worker")
|
||||
}
|
||||
|
||||
func TestBaseGroup_CloseLastListenerTriggersCleanup(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
cleanupCalled := 0
|
||||
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
|
||||
|
||||
bg.mu.Lock()
|
||||
ln1 := bg.newListener(fl.Addr())
|
||||
ln2 := bg.newListener(fl.Addr())
|
||||
bg.mu.Unlock()
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
ln1.Close()
|
||||
assert.Equal(t, 0, cleanupCalled, "cleanup should not run while listeners remain")
|
||||
|
||||
ln2.Close()
|
||||
assert.Equal(t, 1, cleanupCalled, "cleanup should run after last listener closed")
|
||||
}
|
||||
|
||||
func TestBaseGroup_CloseOneOfTwoListeners(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
cleanupCalled := 0
|
||||
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
|
||||
|
||||
bg.mu.Lock()
|
||||
ln1 := bg.newListener(fl.Addr())
|
||||
ln2 := bg.newListener(fl.Addr())
|
||||
bg.mu.Unlock()
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
ln1.Close()
|
||||
assert.Equal(t, 0, cleanupCalled)
|
||||
|
||||
// ln2 should still receive connections.
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
got, err := ln2.Accept()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, c1, got)
|
||||
got.Close()
|
||||
|
||||
ln2.Close()
|
||||
assert.Equal(t, 1, cleanupCalled)
|
||||
}
|
||||
Reference in New Issue
Block a user