Compare commits

...

4 Commits

Author SHA1 Message Date
fatedier
8d1ab7d585 test/e2e: guard Process against double-Start and Stop-before-Start
Add started flag to prevent double-Start panics and allow Stop to
return immediately when the process was never started. Use sync.Once
for closing the done channel as defense-in-depth against double close.
2026-03-09 01:44:54 +08:00
fatedier
f32bec9f4d test/e2e: optimize RunFrps/RunFrpc with process exit detection
Refactor Process to track subprocess lifecycle via a done channel,
replacing direct cmd.Wait() in Stop() to avoid double-Wait races.
RunFrps/RunFrpc now use select on the done channel instead of fixed
sleeps, allowing short-lived processes (verify, startup failures) to
return immediately while preserving existing timeout behavior for
long-running daemons.
2026-03-09 00:41:00 +08:00
fatedier
bcd2424c24 test/e2e: optimize e2e test time by replacing sleeps with TCP readiness checks (#5223)
Replace the fixed 500ms sleep after each frps startup in RunProcesses
with a TCP dial-based readiness check that polls the server bind port.
This reduces the e2e suite wall time from ~97s to ~43s.

Also simplify the RunProcesses API to accept a single server template
string instead of a slice, matching how every call site uses it.
2026-03-08 23:41:33 +08:00
fatedier
c7ac12ea0f server/group: refactor with shared abstractions and fix concurrency issues (#5222)
* server/group: refactor group package with shared abstractions and fix concurrency issues

Extract common patterns into reusable components:
- groupRegistry[G]: generic concurrent map for group lifecycle management
- baseGroup: shared plumbing for listener-based groups (TCP, HTTPS, TCPMux)
- Listener: unified virtual listener replacing 3 identical implementations

Fix concurrency issues:
- Stale-pointer race: isCurrent check + errGroupStale + controller retry loops
- Worker generation safety: pass realLn and acceptCh as params instead of reading mutable fields
- Connection leak: close conn on worker panic recovery path
- ABBA deadlock in HTTP UnRegister: consistent lock ordering (group.mu -> registry.mu)
- Round-robin overflow in HTTPGroup: use unsigned modulo

Add unit tests (17 tests) for registry, listener, and baseGroup.
Add TCPMux group load balancing e2e test.

* server/group: replace tautological assertion with require.NotPanics

* server/group: remove blank line between doc comment and type declaration
2026-03-08 18:57:21 +08:00
48 changed files with 985 additions and 629 deletions

77
server/group/base.go Normal file
View File

@@ -0,0 +1,77 @@
package group
import (
"net"
"sync"
gerr "github.com/fatedier/golib/errors"
)
// baseGroup contains the shared plumbing for listener-based groups
// (TCP, HTTPS, TCPMux). Each concrete group embeds this and provides
// its own Listen method with protocol-specific validation.
type baseGroup struct {
group string
groupKey string
acceptCh chan net.Conn
realLn net.Listener
lns []*Listener
mu sync.Mutex
cleanupFn func()
}
// initBase resets the baseGroup for a fresh listen cycle.
// Must be called under mu when len(lns) == 0.
func (bg *baseGroup) initBase(group, groupKey string, realLn net.Listener, cleanupFn func()) {
bg.group = group
bg.groupKey = groupKey
bg.realLn = realLn
bg.acceptCh = make(chan net.Conn)
bg.cleanupFn = cleanupFn
}
// worker reads from the real listener and fans out to acceptCh.
// The parameters are captured at creation time so that the worker is
// bound to a specific listen cycle and cannot observe a later initBase.
func (bg *baseGroup) worker(realLn net.Listener, acceptCh chan<- net.Conn) {
for {
c, err := realLn.Accept()
if err != nil {
return
}
err = gerr.PanicToError(func() {
acceptCh <- c
})
if err != nil {
c.Close()
return
}
}
}
// newListener creates a new Listener wired to this baseGroup.
// Must be called under mu.
func (bg *baseGroup) newListener(addr net.Addr) *Listener {
ln := newListener(bg.acceptCh, addr, bg.closeListener)
bg.lns = append(bg.lns, ln)
return ln
}
// closeListener removes ln from the list. When the last listener is removed,
// it closes acceptCh, closes the real listener, and calls cleanupFn.
func (bg *baseGroup) closeListener(ln *Listener) {
bg.mu.Lock()
defer bg.mu.Unlock()
for i, l := range bg.lns {
if l == ln {
bg.lns = append(bg.lns[:i], bg.lns[i+1:]...)
break
}
}
if len(bg.lns) == 0 {
close(bg.acceptCh)
bg.realLn.Close()
bg.cleanupFn()
}
}

169
server/group/base_test.go Normal file
View File

@@ -0,0 +1,169 @@
package group
import (
"net"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// fakeLn is a controllable net.Listener for tests.
type fakeLn struct {
connCh chan net.Conn
closed chan struct{}
once sync.Once
}
func newFakeLn() *fakeLn {
return &fakeLn{
connCh: make(chan net.Conn, 8),
closed: make(chan struct{}),
}
}
func (f *fakeLn) Accept() (net.Conn, error) {
select {
case c := <-f.connCh:
return c, nil
case <-f.closed:
return nil, net.ErrClosed
}
}
func (f *fakeLn) Close() error {
f.once.Do(func() { close(f.closed) })
return nil
}
func (f *fakeLn) Addr() net.Addr { return fakeAddr("127.0.0.1:9999") }
func (f *fakeLn) inject(c net.Conn) {
select {
case f.connCh <- c:
case <-f.closed:
}
}
func TestBaseGroup_WorkerFanOut(t *testing.T) {
fl := newFakeLn()
var bg baseGroup
bg.initBase("g", "key", fl, func() {})
go bg.worker(fl, bg.acceptCh)
c1, c2 := net.Pipe()
defer c2.Close()
fl.inject(c1)
select {
case got := <-bg.acceptCh:
assert.Equal(t, c1, got)
got.Close()
case <-time.After(time.Second):
t.Fatal("timed out waiting for connection on acceptCh")
}
fl.Close()
}
func TestBaseGroup_WorkerStopsOnListenerClose(t *testing.T) {
fl := newFakeLn()
var bg baseGroup
bg.initBase("g", "key", fl, func() {})
done := make(chan struct{})
go func() {
bg.worker(fl, bg.acceptCh)
close(done)
}()
fl.Close()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("worker did not stop after listener close")
}
}
func TestBaseGroup_WorkerClosesConnOnClosedChannel(t *testing.T) {
fl := newFakeLn()
var bg baseGroup
bg.initBase("g", "key", fl, func() {})
// Close acceptCh before worker sends.
close(bg.acceptCh)
done := make(chan struct{})
go func() {
bg.worker(fl, bg.acceptCh)
close(done)
}()
c1, c2 := net.Pipe()
defer c2.Close()
fl.inject(c1)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("worker did not stop after panic recovery")
}
// c1 should have been closed by worker's panic recovery path.
buf := make([]byte, 1)
_, err := c1.Read(buf)
assert.Error(t, err, "connection should be closed by worker")
}
func TestBaseGroup_CloseLastListenerTriggersCleanup(t *testing.T) {
fl := newFakeLn()
var bg baseGroup
cleanupCalled := 0
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
bg.mu.Lock()
ln1 := bg.newListener(fl.Addr())
ln2 := bg.newListener(fl.Addr())
bg.mu.Unlock()
go bg.worker(fl, bg.acceptCh)
ln1.Close()
assert.Equal(t, 0, cleanupCalled, "cleanup should not run while listeners remain")
ln2.Close()
assert.Equal(t, 1, cleanupCalled, "cleanup should run after last listener closed")
}
func TestBaseGroup_CloseOneOfTwoListeners(t *testing.T) {
fl := newFakeLn()
var bg baseGroup
cleanupCalled := 0
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
bg.mu.Lock()
ln1 := bg.newListener(fl.Addr())
ln2 := bg.newListener(fl.Addr())
bg.mu.Unlock()
go bg.worker(fl, bg.acceptCh)
ln1.Close()
assert.Equal(t, 0, cleanupCalled)
// ln2 should still receive connections.
c1, c2 := net.Pipe()
defer c2.Close()
fl.inject(c1)
got, err := ln2.Accept()
require.NoError(t, err)
assert.Equal(t, c1, got)
got.Close()
ln2.Close()
assert.Equal(t, 1, cleanupCalled)
}

View File

@@ -24,4 +24,6 @@ var (
ErrListenerClosed = errors.New("group listener closed")
ErrGroupDifferentPort = errors.New("group should have same remote port")
ErrProxyRepeated = errors.New("group proxy repeated")
errGroupStale = errors.New("stale group reference")
)

View File

@@ -9,53 +9,42 @@ import (
"github.com/fatedier/frp/pkg/util/vhost"
)
// HTTPGroupController manages HTTP groups that use round-robin
// callback routing (fundamentally different from listener-based groups).
type HTTPGroupController struct {
// groups indexed by group name
groups map[string]*HTTPGroup
// register createConn for each group to vhostRouter.
// createConn will get a connection from one proxy of the group
groupRegistry[*HTTPGroup]
vhostRouter *vhost.Routers
mu sync.Mutex
}
func NewHTTPGroupController(vhostRouter *vhost.Routers) *HTTPGroupController {
return &HTTPGroupController{
groups: make(map[string]*HTTPGroup),
vhostRouter: vhostRouter,
groupRegistry: newGroupRegistry[*HTTPGroup](),
vhostRouter: vhostRouter,
}
}
func (ctl *HTTPGroupController) Register(
proxyName, group, groupKey string,
routeConfig vhost.RouteConfig,
) (err error) {
indexKey := group
ctl.mu.Lock()
g, ok := ctl.groups[indexKey]
if !ok {
g = NewHTTPGroup(ctl)
ctl.groups[indexKey] = g
) error {
for {
g := ctl.getOrCreate(group, func() *HTTPGroup {
return NewHTTPGroup(ctl)
})
err := g.Register(proxyName, group, groupKey, routeConfig)
if err == errGroupStale {
continue
}
return err
}
ctl.mu.Unlock()
return g.Register(proxyName, group, groupKey, routeConfig)
}
func (ctl *HTTPGroupController) UnRegister(proxyName, group string, _ vhost.RouteConfig) {
indexKey := group
ctl.mu.Lock()
defer ctl.mu.Unlock()
g, ok := ctl.groups[indexKey]
g, ok := ctl.get(group)
if !ok {
return
}
isEmpty := g.UnRegister(proxyName)
if isEmpty {
delete(ctl.groups, indexKey)
}
g.UnRegister(proxyName)
}
type HTTPGroup struct {
@@ -87,6 +76,9 @@ func (g *HTTPGroup) Register(
) (err error) {
g.mu.Lock()
defer g.mu.Unlock()
if !g.ctl.isCurrent(group, func(cur *HTTPGroup) bool { return cur == g }) {
return errGroupStale
}
if len(g.createFuncs) == 0 {
// the first proxy in this group
tmp := routeConfig // copy object
@@ -123,7 +115,7 @@ func (g *HTTPGroup) Register(
return nil
}
func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
func (g *HTTPGroup) UnRegister(proxyName string) {
g.mu.Lock()
defer g.mu.Unlock()
delete(g.createFuncs, proxyName)
@@ -135,10 +127,11 @@ func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
}
if len(g.createFuncs) == 0 {
isEmpty = true
g.ctl.vhostRouter.Del(g.domain, g.location, g.routeByHTTPUser)
g.ctl.removeIf(g.group, func(cur *HTTPGroup) bool {
return cur == g
})
}
return
}
func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
@@ -151,7 +144,7 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
location := g.location
routeByHTTPUser := g.routeByHTTPUser
if len(g.pxyNames) > 0 {
name := g.pxyNames[int(newIndex)%len(g.pxyNames)]
name := g.pxyNames[newIndex%uint64(len(g.pxyNames))]
f = g.createFuncs[name]
}
g.mu.RUnlock()
@@ -174,7 +167,7 @@ func (g *HTTPGroup) chooseEndpoint() (string, error) {
location := g.location
routeByHTTPUser := g.routeByHTTPUser
if len(g.pxyNames) > 0 {
name = g.pxyNames[int(newIndex)%len(g.pxyNames)]
name = g.pxyNames[newIndex%uint64(len(g.pxyNames))]
}
g.mu.RUnlock()

View File

@@ -17,25 +17,19 @@ package group
import (
"context"
"net"
"sync"
gerr "github.com/fatedier/golib/errors"
"github.com/fatedier/frp/pkg/util/vhost"
)
type HTTPSGroupController struct {
groups map[string]*HTTPSGroup
groupRegistry[*HTTPSGroup]
httpsMuxer *vhost.HTTPSMuxer
mu sync.Mutex
}
func NewHTTPSGroupController(httpsMuxer *vhost.HTTPSMuxer) *HTTPSGroupController {
return &HTTPSGroupController{
groups: make(map[string]*HTTPSGroup),
httpsMuxer: httpsMuxer,
groupRegistry: newGroupRegistry[*HTTPSGroup](),
httpsMuxer: httpsMuxer,
}
}
@@ -44,41 +38,28 @@ func (ctl *HTTPSGroupController) Listen(
group, groupKey string,
routeConfig vhost.RouteConfig,
) (l net.Listener, err error) {
indexKey := group
ctl.mu.Lock()
g, ok := ctl.groups[indexKey]
if !ok {
g = NewHTTPSGroup(ctl)
ctl.groups[indexKey] = g
for {
g := ctl.getOrCreate(group, func() *HTTPSGroup {
return NewHTTPSGroup(ctl)
})
l, err = g.Listen(ctx, group, groupKey, routeConfig)
if err == errGroupStale {
continue
}
return
}
ctl.mu.Unlock()
return g.Listen(ctx, group, groupKey, routeConfig)
}
func (ctl *HTTPSGroupController) RemoveGroup(group string) {
ctl.mu.Lock()
defer ctl.mu.Unlock()
delete(ctl.groups, group)
}
type HTTPSGroup struct {
group string
groupKey string
domain string
baseGroup
acceptCh chan net.Conn
httpsLn *vhost.Listener
lns []*HTTPSGroupListener
ctl *HTTPSGroupController
mu sync.Mutex
domain string
ctl *HTTPSGroupController
}
func NewHTTPSGroup(ctl *HTTPSGroupController) *HTTPSGroup {
return &HTTPSGroup{
lns: make([]*HTTPSGroupListener, 0),
ctl: ctl,
acceptCh: make(chan net.Conn),
ctl: ctl,
}
}
@@ -86,23 +67,27 @@ func (g *HTTPSGroup) Listen(
ctx context.Context,
group, groupKey string,
routeConfig vhost.RouteConfig,
) (ln *HTTPSGroupListener, err error) {
) (ln *Listener, err error) {
g.mu.Lock()
defer g.mu.Unlock()
if !g.ctl.isCurrent(group, func(cur *HTTPSGroup) bool { return cur == g }) {
return nil, errGroupStale
}
if len(g.lns) == 0 {
// the first listener, listen on the real address
httpsLn, errRet := g.ctl.httpsMuxer.Listen(ctx, &routeConfig)
if errRet != nil {
return nil, errRet
}
ln = newHTTPSGroupListener(group, g, httpsLn.Addr())
g.group = group
g.groupKey = groupKey
g.domain = routeConfig.Domain
g.httpsLn = httpsLn
g.lns = append(g.lns, ln)
go g.worker()
g.initBase(group, groupKey, httpsLn, func() {
g.ctl.removeIf(g.group, func(cur *HTTPSGroup) bool {
return cur == g
})
})
ln = g.newListener(httpsLn.Addr())
go g.worker(httpsLn, g.acceptCh)
} else {
// route config in the same group must be equal
if g.group != group || g.domain != routeConfig.Domain {
@@ -111,87 +96,7 @@ func (g *HTTPSGroup) Listen(
if g.groupKey != groupKey {
return nil, ErrGroupAuthFailed
}
ln = newHTTPSGroupListener(group, g, g.lns[0].Addr())
g.lns = append(g.lns, ln)
ln = g.newListener(g.lns[0].Addr())
}
return
}
func (g *HTTPSGroup) worker() {
for {
c, err := g.httpsLn.Accept()
if err != nil {
return
}
err = gerr.PanicToError(func() {
g.acceptCh <- c
})
if err != nil {
return
}
}
}
func (g *HTTPSGroup) Accept() <-chan net.Conn {
return g.acceptCh
}
func (g *HTTPSGroup) CloseListener(ln *HTTPSGroupListener) {
g.mu.Lock()
defer g.mu.Unlock()
for i, tmpLn := range g.lns {
if tmpLn == ln {
g.lns = append(g.lns[:i], g.lns[i+1:]...)
break
}
}
if len(g.lns) == 0 {
close(g.acceptCh)
if g.httpsLn != nil {
g.httpsLn.Close()
}
g.ctl.RemoveGroup(g.group)
}
}
type HTTPSGroupListener struct {
groupName string
group *HTTPSGroup
addr net.Addr
closeCh chan struct{}
}
func newHTTPSGroupListener(name string, group *HTTPSGroup, addr net.Addr) *HTTPSGroupListener {
return &HTTPSGroupListener{
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
}
}
func (ln *HTTPSGroupListener) Accept() (c net.Conn, err error) {
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
func (ln *HTTPSGroupListener) Addr() net.Addr {
return ln.addr
}
func (ln *HTTPSGroupListener) Close() (err error) {
close(ln.closeCh)
// remove self from HTTPSGroup
ln.group.CloseListener(ln)
return
}

49
server/group/listener.go Normal file
View File

@@ -0,0 +1,49 @@
package group
import (
"net"
"sync"
)
// Listener is a per-proxy virtual listener that receives connections
// from a shared group. It implements net.Listener.
type Listener struct {
acceptCh <-chan net.Conn
addr net.Addr
closeCh chan struct{}
onClose func(*Listener)
once sync.Once
}
func newListener(acceptCh <-chan net.Conn, addr net.Addr, onClose func(*Listener)) *Listener {
return &Listener{
acceptCh: acceptCh,
addr: addr,
closeCh: make(chan struct{}),
onClose: onClose,
}
}
func (ln *Listener) Accept() (net.Conn, error) {
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok := <-ln.acceptCh:
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
func (ln *Listener) Addr() net.Addr {
return ln.addr
}
func (ln *Listener) Close() error {
ln.once.Do(func() {
close(ln.closeCh)
ln.onClose(ln)
})
return nil
}

View File

@@ -0,0 +1,68 @@
package group
import (
"net"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestListener_Accept(t *testing.T) {
acceptCh := make(chan net.Conn, 1)
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
c1, c2 := net.Pipe()
defer c1.Close()
defer c2.Close()
acceptCh <- c1
got, err := ln.Accept()
require.NoError(t, err)
assert.Equal(t, c1, got)
}
func TestListener_AcceptAfterChannelClose(t *testing.T) {
acceptCh := make(chan net.Conn)
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
close(acceptCh)
_, err := ln.Accept()
assert.ErrorIs(t, err, ErrListenerClosed)
}
func TestListener_AcceptAfterListenerClose(t *testing.T) {
acceptCh := make(chan net.Conn) // open, not closed
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
ln.Close()
_, err := ln.Accept()
assert.ErrorIs(t, err, ErrListenerClosed)
}
func TestListener_DoubleClose(t *testing.T) {
closeCalls := 0
ln := newListener(
make(chan net.Conn),
fakeAddr("127.0.0.1:1234"),
func(*Listener) { closeCalls++ },
)
assert.NotPanics(t, func() {
ln.Close()
ln.Close()
})
assert.Equal(t, 1, closeCalls, "onClose should be called exactly once")
}
func TestListener_Addr(t *testing.T) {
addr := fakeAddr("10.0.0.1:5555")
ln := newListener(make(chan net.Conn), addr, func(*Listener) {})
assert.Equal(t, addr, ln.Addr())
}
// fakeAddr implements net.Addr for testing.
type fakeAddr string
func (a fakeAddr) Network() string { return "tcp" }
func (a fakeAddr) String() string { return string(a) }

59
server/group/registry.go Normal file
View File

@@ -0,0 +1,59 @@
package group
import (
"sync"
)
// groupRegistry is a concurrent map of named groups with
// automatic creation on first access.
type groupRegistry[G any] struct {
groups map[string]G
mu sync.Mutex
}
func newGroupRegistry[G any]() groupRegistry[G] {
return groupRegistry[G]{
groups: make(map[string]G),
}
}
func (r *groupRegistry[G]) getOrCreate(key string, newFn func() G) G {
r.mu.Lock()
defer r.mu.Unlock()
g, ok := r.groups[key]
if !ok {
g = newFn()
r.groups[key] = g
}
return g
}
func (r *groupRegistry[G]) get(key string) (G, bool) {
r.mu.Lock()
defer r.mu.Unlock()
g, ok := r.groups[key]
return g, ok
}
// isCurrent returns true if key exists in the registry and matchFn
// returns true for the stored value.
func (r *groupRegistry[G]) isCurrent(key string, matchFn func(G) bool) bool {
r.mu.Lock()
defer r.mu.Unlock()
g, ok := r.groups[key]
return ok && matchFn(g)
}
// removeIf atomically looks up the group for key, calls fn on it,
// and removes the entry if fn returns true.
func (r *groupRegistry[G]) removeIf(key string, fn func(G) bool) {
r.mu.Lock()
defer r.mu.Unlock()
g, ok := r.groups[key]
if !ok {
return
}
if fn(g) {
delete(r.groups, key)
}
}

View File

@@ -0,0 +1,102 @@
package group
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetOrCreate_New(t *testing.T) {
r := newGroupRegistry[*int]()
called := 0
v := 42
got := r.getOrCreate("k", func() *int { called++; return &v })
assert.Equal(t, 1, called)
assert.Equal(t, &v, got)
}
func TestGetOrCreate_Existing(t *testing.T) {
r := newGroupRegistry[*int]()
v := 42
r.getOrCreate("k", func() *int { return &v })
called := 0
got := r.getOrCreate("k", func() *int { called++; return nil })
assert.Equal(t, 0, called)
assert.Equal(t, &v, got)
}
func TestGet_ExistingAndMissing(t *testing.T) {
r := newGroupRegistry[*int]()
v := 1
r.getOrCreate("k", func() *int { return &v })
got, ok := r.get("k")
assert.True(t, ok)
assert.Equal(t, &v, got)
_, ok = r.get("missing")
assert.False(t, ok)
}
func TestIsCurrent(t *testing.T) {
r := newGroupRegistry[*int]()
v1 := 1
v2 := 2
r.getOrCreate("k", func() *int { return &v1 })
assert.True(t, r.isCurrent("k", func(g *int) bool { return g == &v1 }))
assert.False(t, r.isCurrent("k", func(g *int) bool { return g == &v2 }))
assert.False(t, r.isCurrent("missing", func(g *int) bool { return true }))
}
func TestRemoveIf(t *testing.T) {
t.Run("removes when fn returns true", func(t *testing.T) {
r := newGroupRegistry[*int]()
v := 1
r.getOrCreate("k", func() *int { return &v })
r.removeIf("k", func(g *int) bool { return g == &v })
_, ok := r.get("k")
assert.False(t, ok)
})
t.Run("keeps when fn returns false", func(t *testing.T) {
r := newGroupRegistry[*int]()
v := 1
r.getOrCreate("k", func() *int { return &v })
r.removeIf("k", func(g *int) bool { return false })
_, ok := r.get("k")
assert.True(t, ok)
})
t.Run("noop on missing key", func(t *testing.T) {
r := newGroupRegistry[*int]()
r.removeIf("missing", func(g *int) bool { return true }) // should not panic
})
}
func TestConcurrentGetOrCreateAndRemoveIf(t *testing.T) {
r := newGroupRegistry[*int]()
const n = 100
var wg sync.WaitGroup
wg.Add(n * 2)
for i := range n {
v := i
go func() {
defer wg.Done()
r.getOrCreate("k", func() *int { return &v })
}()
go func() {
defer wg.Done()
r.removeIf("k", func(*int) bool { return true })
}()
}
wg.Wait()
// After all goroutines finish, accessing the key must not panic.
require.NotPanics(t, func() {
_, _ = r.get("k")
})
}

View File

@@ -17,83 +17,67 @@ package group
import (
"net"
"strconv"
"sync"
gerr "github.com/fatedier/golib/errors"
"github.com/fatedier/frp/server/ports"
)
// TCPGroupCtl manage all TCPGroups
// TCPGroupCtl manages all TCPGroups.
type TCPGroupCtl struct {
groups map[string]*TCPGroup
// portManager is used to manage port
groupRegistry[*TCPGroup]
portManager *ports.Manager
mu sync.Mutex
}
// NewTCPGroupCtl return a new TcpGroupCtl
// NewTCPGroupCtl returns a new TCPGroupCtl.
func NewTCPGroupCtl(portManager *ports.Manager) *TCPGroupCtl {
return &TCPGroupCtl{
groups: make(map[string]*TCPGroup),
portManager: portManager,
groupRegistry: newGroupRegistry[*TCPGroup](),
portManager: portManager,
}
}
// Listen is the wrapper for TCPGroup's Listen
// If there are no group, we will create one here
// Listen is the wrapper for TCPGroup's Listen.
// If there is no group, one will be created.
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string,
addr string, port int,
) (l net.Listener, realPort int, err error) {
tgc.mu.Lock()
tcpGroup, ok := tgc.groups[group]
if !ok {
tcpGroup = NewTCPGroup(tgc)
tgc.groups[group] = tcpGroup
for {
tcpGroup := tgc.getOrCreate(group, func() *TCPGroup {
return NewTCPGroup(tgc)
})
l, realPort, err = tcpGroup.Listen(proxyName, group, groupKey, addr, port)
if err == errGroupStale {
continue
}
return
}
tgc.mu.Unlock()
return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}
// RemoveGroup remove TCPGroup from controller
func (tgc *TCPGroupCtl) RemoveGroup(group string) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
delete(tgc.groups, group)
}
// TCPGroup route connections to different proxies
// TCPGroup routes connections to different proxies.
type TCPGroup struct {
group string
groupKey string
baseGroup
addr string
port int
realPort int
acceptCh chan net.Conn
tcpLn net.Listener
lns []*TCPGroupListener
ctl *TCPGroupCtl
mu sync.Mutex
}
// NewTCPGroup return a new TCPGroup
// NewTCPGroup returns a new TCPGroup.
func NewTCPGroup(ctl *TCPGroupCtl) *TCPGroup {
return &TCPGroup{
lns: make([]*TCPGroupListener, 0),
ctl: ctl,
acceptCh: make(chan net.Conn),
ctl: ctl,
}
}
// Listen will return a new TCPGroupListener
// if TCPGroup already has a listener, just add a new TCPGroupListener to the queues
// otherwise, listen on the real address
func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr string, port int) (ln *TCPGroupListener, realPort int, err error) {
// Listen will return a new Listener.
// If TCPGroup already has a listener, just add a new Listener to the queues,
// otherwise listen on the real address.
func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr string, port int) (ln *Listener, realPort int, err error) {
tg.mu.Lock()
defer tg.mu.Unlock()
if !tg.ctl.isCurrent(group, func(cur *TCPGroup) bool { return cur == tg }) {
return nil, 0, errGroupStale
}
if len(tg.lns) == 0 {
// the first listener, listen on the real address
realPort, err = tg.ctl.portManager.Acquire(proxyName, port)
@@ -106,19 +90,18 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
err = errRet
return
}
ln = newTCPGroupListener(group, tg, tcpLn.Addr())
tg.group = group
tg.groupKey = groupKey
tg.addr = addr
tg.port = port
tg.realPort = realPort
tg.tcpLn = tcpLn
tg.lns = append(tg.lns, ln)
if tg.acceptCh == nil {
tg.acceptCh = make(chan net.Conn)
}
go tg.worker()
tg.initBase(group, groupKey, tcpLn, func() {
tg.ctl.portManager.Release(tg.realPort)
tg.ctl.removeIf(tg.group, func(cur *TCPGroup) bool {
return cur == tg
})
})
ln = tg.newListener(tcpLn.Addr())
go tg.worker(tcpLn, tg.acceptCh)
} else {
// address and port in the same group must be equal
if tg.group != group || tg.addr != addr {
@@ -133,92 +116,8 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
err = ErrGroupAuthFailed
return
}
ln = newTCPGroupListener(group, tg, tg.lns[0].Addr())
ln = tg.newListener(tg.lns[0].Addr())
realPort = tg.realPort
tg.lns = append(tg.lns, ln)
}
return
}
// worker is called when the real tcp listener has been created
func (tg *TCPGroup) worker() {
for {
c, err := tg.tcpLn.Accept()
if err != nil {
return
}
err = gerr.PanicToError(func() {
tg.acceptCh <- c
})
if err != nil {
return
}
}
}
func (tg *TCPGroup) Accept() <-chan net.Conn {
return tg.acceptCh
}
// CloseListener remove the TCPGroupListener from the TCPGroup
func (tg *TCPGroup) CloseListener(ln *TCPGroupListener) {
tg.mu.Lock()
defer tg.mu.Unlock()
for i, tmpLn := range tg.lns {
if tmpLn == ln {
tg.lns = append(tg.lns[:i], tg.lns[i+1:]...)
break
}
}
if len(tg.lns) == 0 {
close(tg.acceptCh)
tg.tcpLn.Close()
tg.ctl.portManager.Release(tg.realPort)
tg.ctl.RemoveGroup(tg.group)
}
}
// TCPGroupListener
type TCPGroupListener struct {
groupName string
group *TCPGroup
addr net.Addr
closeCh chan struct{}
}
func newTCPGroupListener(name string, group *TCPGroup, addr net.Addr) *TCPGroupListener {
return &TCPGroupListener{
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
}
}
// Accept will accept connections from TCPGroup
func (ln *TCPGroupListener) Accept() (c net.Conn, err error) {
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
func (ln *TCPGroupListener) Addr() net.Addr {
return ln.addr
}
// Close close the listener
func (ln *TCPGroupListener) Close() (err error) {
close(ln.closeCh)
// remove self from TcpGroup
ln.group.CloseListener(ln)
return
}

View File

@@ -18,118 +18,100 @@ import (
"context"
"fmt"
"net"
"sync"
gerr "github.com/fatedier/golib/errors"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/util/tcpmux"
"github.com/fatedier/frp/pkg/util/vhost"
)
// TCPMuxGroupCtl manage all TCPMuxGroups
// TCPMuxGroupCtl manages all TCPMuxGroups.
type TCPMuxGroupCtl struct {
groups map[string]*TCPMuxGroup
// portManager is used to manage port
groupRegistry[*TCPMuxGroup]
tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer
mu sync.Mutex
}
// NewTCPMuxGroupCtl return a new TCPMuxGroupCtl
// NewTCPMuxGroupCtl returns a new TCPMuxGroupCtl.
func NewTCPMuxGroupCtl(tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer) *TCPMuxGroupCtl {
return &TCPMuxGroupCtl{
groups: make(map[string]*TCPMuxGroup),
groupRegistry: newGroupRegistry[*TCPMuxGroup](),
tcpMuxHTTPConnectMuxer: tcpMuxHTTPConnectMuxer,
}
}
// Listen is the wrapper for TCPMuxGroup's Listen
// If there are no group, we will create one here
// Listen is the wrapper for TCPMuxGroup's Listen.
// If there is no group, one will be created.
func (tmgc *TCPMuxGroupCtl) Listen(
ctx context.Context,
multiplexer, group, groupKey string,
routeConfig vhost.RouteConfig,
) (l net.Listener, err error) {
tmgc.mu.Lock()
tcpMuxGroup, ok := tmgc.groups[group]
if !ok {
tcpMuxGroup = NewTCPMuxGroup(tmgc)
tmgc.groups[group] = tcpMuxGroup
}
tmgc.mu.Unlock()
for {
tcpMuxGroup := tmgc.getOrCreate(group, func() *TCPMuxGroup {
return NewTCPMuxGroup(tmgc)
})
switch v1.TCPMultiplexerType(multiplexer) {
case v1.TCPMultiplexerHTTPConnect:
return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, routeConfig)
default:
err = fmt.Errorf("unknown multiplexer [%s]", multiplexer)
return
switch v1.TCPMultiplexerType(multiplexer) {
case v1.TCPMultiplexerHTTPConnect:
l, err = tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, routeConfig)
if err == errGroupStale {
continue
}
return
default:
return nil, fmt.Errorf("unknown multiplexer [%s]", multiplexer)
}
}
}
// RemoveGroup remove TCPMuxGroup from controller
func (tmgc *TCPMuxGroupCtl) RemoveGroup(group string) {
tmgc.mu.Lock()
defer tmgc.mu.Unlock()
delete(tmgc.groups, group)
}
// TCPMuxGroup route connections to different proxies
// TCPMuxGroup routes connections to different proxies.
type TCPMuxGroup struct {
group string
groupKey string
baseGroup
domain string
routeByHTTPUser string
username string
password string
acceptCh chan net.Conn
tcpMuxLn net.Listener
lns []*TCPMuxGroupListener
ctl *TCPMuxGroupCtl
mu sync.Mutex
ctl *TCPMuxGroupCtl
}
// NewTCPMuxGroup return a new TCPMuxGroup
// NewTCPMuxGroup returns a new TCPMuxGroup.
func NewTCPMuxGroup(ctl *TCPMuxGroupCtl) *TCPMuxGroup {
return &TCPMuxGroup{
lns: make([]*TCPMuxGroupListener, 0),
ctl: ctl,
acceptCh: make(chan net.Conn),
ctl: ctl,
}
}
// Listen will return a new TCPMuxGroupListener
// if TCPMuxGroup already has a listener, just add a new TCPMuxGroupListener to the queues
// otherwise, listen on the real address
// HTTPConnectListen will return a new Listener.
// If TCPMuxGroup already has a listener, just add a new Listener to the queues,
// otherwise listen on the real address.
func (tmg *TCPMuxGroup) HTTPConnectListen(
ctx context.Context,
group, groupKey string,
routeConfig vhost.RouteConfig,
) (ln *TCPMuxGroupListener, err error) {
) (ln *Listener, err error) {
tmg.mu.Lock()
defer tmg.mu.Unlock()
if !tmg.ctl.isCurrent(group, func(cur *TCPMuxGroup) bool { return cur == tmg }) {
return nil, errGroupStale
}
if len(tmg.lns) == 0 {
// the first listener, listen on the real address
tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, &routeConfig)
if errRet != nil {
return nil, errRet
}
ln = newTCPMuxGroupListener(group, tmg, tcpMuxLn.Addr())
tmg.group = group
tmg.groupKey = groupKey
tmg.domain = routeConfig.Domain
tmg.routeByHTTPUser = routeConfig.RouteByHTTPUser
tmg.username = routeConfig.Username
tmg.password = routeConfig.Password
tmg.tcpMuxLn = tcpMuxLn
tmg.lns = append(tmg.lns, ln)
if tmg.acceptCh == nil {
tmg.acceptCh = make(chan net.Conn)
}
go tmg.worker()
tmg.initBase(group, groupKey, tcpMuxLn, func() {
tmg.ctl.removeIf(tmg.group, func(cur *TCPMuxGroup) bool {
return cur == tmg
})
})
ln = tmg.newListener(tcpMuxLn.Addr())
go tmg.worker(tcpMuxLn, tmg.acceptCh)
} else {
// route config in the same group must be equal
if tmg.group != group || tmg.domain != routeConfig.Domain ||
@@ -141,90 +123,7 @@ func (tmg *TCPMuxGroup) HTTPConnectListen(
if tmg.groupKey != groupKey {
return nil, ErrGroupAuthFailed
}
ln = newTCPMuxGroupListener(group, tmg, tmg.lns[0].Addr())
tmg.lns = append(tmg.lns, ln)
ln = tmg.newListener(tmg.lns[0].Addr())
}
return
}
// worker is called when the real TCP listener has been created
func (tmg *TCPMuxGroup) worker() {
for {
c, err := tmg.tcpMuxLn.Accept()
if err != nil {
return
}
err = gerr.PanicToError(func() {
tmg.acceptCh <- c
})
if err != nil {
return
}
}
}
func (tmg *TCPMuxGroup) Accept() <-chan net.Conn {
return tmg.acceptCh
}
// CloseListener remove the TCPMuxGroupListener from the TCPMuxGroup
func (tmg *TCPMuxGroup) CloseListener(ln *TCPMuxGroupListener) {
tmg.mu.Lock()
defer tmg.mu.Unlock()
for i, tmpLn := range tmg.lns {
if tmpLn == ln {
tmg.lns = append(tmg.lns[:i], tmg.lns[i+1:]...)
break
}
}
if len(tmg.lns) == 0 {
close(tmg.acceptCh)
tmg.tcpMuxLn.Close()
tmg.ctl.RemoveGroup(tmg.group)
}
}
// TCPMuxGroupListener
type TCPMuxGroupListener struct {
groupName string
group *TCPMuxGroup
addr net.Addr
closeCh chan struct{}
}
func newTCPMuxGroupListener(name string, group *TCPMuxGroup, addr net.Addr) *TCPMuxGroupListener {
return &TCPMuxGroupListener{
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
}
}
// Accept will accept connections from TCPMuxGroup
func (ln *TCPMuxGroupListener) Accept() (c net.Conn, err error) {
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
func (ln *TCPMuxGroupListener) Addr() net.Addr {
return ln.addr
}
// Close close the listener
func (ln *TCPMuxGroupListener) Close() (err error) {
close(ln.closeCh)
// remove self from TcpMuxGroup
ln.group.CloseListener(ln)
return
}

View File

@@ -26,7 +26,7 @@ var _ = ginkgo.Describe("[Feature: Example]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})

View File

@@ -3,67 +3,70 @@ package framework
import (
"fmt"
"maps"
"net"
"os"
"path/filepath"
"slices"
"strconv"
"time"
flog "github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/test/e2e/framework/consts"
"github.com/fatedier/frp/test/e2e/pkg/process"
)
// RunProcesses run multiple processes from templates.
// The first template should always be frps.
func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []string) ([]*process.Process, []*process.Process) {
templates := slices.Concat(serverTemplates, clientTemplates)
// RunProcesses starts one frps and zero or more frpc processes from templates.
func (f *Framework) RunProcesses(serverTemplate string, clientTemplates []string) (*process.Process, []*process.Process) {
templates := append([]string{serverTemplate}, clientTemplates...)
outs, ports, err := f.RenderTemplates(templates)
ExpectNoError(err)
ExpectTrue(len(templates) > 0)
maps.Copy(f.usedPorts, ports)
currentServerProcesses := make([]*process.Process, 0, len(serverTemplates))
for i := range serverTemplates {
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-server-%d", i))
err = os.WriteFile(path, []byte(outs[i]), 0o600)
ExpectNoError(err)
// Start frps.
serverPath := filepath.Join(f.TempDirectory, "frp-e2e-server-0")
err = os.WriteFile(serverPath, []byte(outs[0]), 0o600)
ExpectNoError(err)
if TestContext.Debug {
flog.Debugf("[%s] %s", path, outs[i])
}
p := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", path}, f.osEnvs)
f.serverConfPaths = append(f.serverConfPaths, path)
f.serverProcesses = append(f.serverProcesses, p)
currentServerProcesses = append(currentServerProcesses, p)
err = p.Start()
ExpectNoError(err)
time.Sleep(500 * time.Millisecond)
if TestContext.Debug {
flog.Debugf("[%s] %s", serverPath, outs[0])
}
time.Sleep(2 * time.Second)
currentClientProcesses := make([]*process.Process, 0, len(clientTemplates))
serverProcess := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", serverPath}, f.osEnvs)
f.serverConfPaths = append(f.serverConfPaths, serverPath)
f.serverProcesses = append(f.serverProcesses, serverProcess)
err = serverProcess.Start()
ExpectNoError(err)
if port, ok := ports[consts.PortServerName]; ok {
ExpectNoError(WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(port)), 5*time.Second))
} else {
time.Sleep(2 * time.Second)
}
// Start frpc(s).
clientProcesses := make([]*process.Process, 0, len(clientTemplates))
for i := range clientTemplates {
index := i + len(serverTemplates)
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-client-%d", i))
err = os.WriteFile(path, []byte(outs[index]), 0o600)
err = os.WriteFile(path, []byte(outs[1+i]), 0o600)
ExpectNoError(err)
if TestContext.Debug {
flog.Debugf("[%s] %s", path, outs[index])
flog.Debugf("[%s] %s", path, outs[1+i])
}
p := process.NewWithEnvs(TestContext.FRPClientPath, []string{"-c", path}, f.osEnvs)
f.clientConfPaths = append(f.clientConfPaths, path)
f.clientProcesses = append(f.clientProcesses, p)
currentClientProcesses = append(currentClientProcesses, p)
clientProcesses = append(clientProcesses, p)
err = p.Start()
ExpectNoError(err)
time.Sleep(500 * time.Millisecond)
}
time.Sleep(3 * time.Second)
// frpc needs time to connect and register proxies with frps.
if len(clientProcesses) > 0 {
time.Sleep(1500 * time.Millisecond)
}
return currentServerProcesses, currentClientProcesses
return serverProcess, clientProcesses
}
func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) {
@@ -71,11 +74,13 @@ func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) {
f.serverProcesses = append(f.serverProcesses, p)
err := p.Start()
if err != nil {
return p, p.StdOutput(), err
return p, p.Output(), err
}
// Give frps extra time to finish binding ports before proceeding.
time.Sleep(4 * time.Second)
return p, p.StdOutput(), nil
select {
case <-p.Done():
case <-time.After(2 * time.Second):
}
return p, p.Output(), nil
}
func (f *Framework) RunFrpc(args ...string) (*process.Process, string, error) {
@@ -83,10 +88,13 @@ func (f *Framework) RunFrpc(args ...string) (*process.Process, string, error) {
f.clientProcesses = append(f.clientProcesses, p)
err := p.Start()
if err != nil {
return p, p.StdOutput(), err
return p, p.Output(), err
}
time.Sleep(2 * time.Second)
return p, p.StdOutput(), nil
select {
case <-p.Done():
case <-time.After(1500 * time.Millisecond):
}
return p, p.Output(), nil
}
func (f *Framework) GenerateConfigFile(content string) string {
@@ -96,3 +104,25 @@ func (f *Framework) GenerateConfigFile(content string) string {
ExpectNoError(err)
return path
}
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
func WaitForTCPReady(addr string, timeout time.Duration) error {
if timeout <= 0 {
return fmt.Errorf("invalid timeout for TCP readiness on %s: timeout must be positive", addr)
}
deadline := time.Now().Add(timeout)
var lastErr error
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
if err == nil {
conn.Close()
return nil
}
lastErr = err
time.Sleep(50 * time.Millisecond)
}
if lastErr == nil {
return fmt.Errorf("timeout waiting for TCP readiness on %s before any dial attempt", addr)
}
return fmt.Errorf("timeout waiting for TCP readiness on %s: %w", addr, lastErr)
}

View File

@@ -82,7 +82,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
framework.NewRequestExpect(f).
@@ -152,7 +152,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
for domain := range strings.SplitSeq(test.customDomains, ",") {
@@ -235,7 +235,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -419,7 +419,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
}
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
f.RunProcesses(serverConf, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
for _, test := range tests {
timeout := time.Second
@@ -497,7 +497,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
// Request without HTTP connect should get error
framework.NewRequestExpect(f).

View File

@@ -48,7 +48,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
framework.TCPEchoServerPort, p2Port,
framework.TCPEchoServerPort, p3Port)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(p1Port).Ensure()
framework.NewRequestExpect(f).Port(p2Port).Ensure()
@@ -90,7 +90,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
admin_pwd = admin
`, dashboardPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPPath("/healthz")
@@ -116,7 +116,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
remote_port = %d
`, adminPort, framework.TCPEchoServerPort, testPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(testPort).Ensure()

View File

@@ -76,7 +76,7 @@ func runClientServerTest(f *framework.Framework, configures *generalTestConfigur
clientConfs = append(clientConfs, client2Conf)
}
f.RunProcesses([]string{serverConf}, clientConfs)
f.RunProcesses(serverConf, clientConfs)
if configures.testDelay > 0 {
time.Sleep(configures.testDelay)

View File

@@ -33,7 +33,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
`, "`", "`", framework.TCPEchoServerPort, portName)
f.SetEnvs([]string{"FRP_TOKEN=123"})
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).PortName(portName).Ensure()
})

View File

@@ -56,7 +56,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
locations = /bar
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tests := []struct {
path string
@@ -111,7 +111,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
custom_domains = normal.example.com
`, fooPort, barPort, otherPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// user1
framework.NewRequestExpect(f).Explain("user1").Port(vhostHTTPPort).
@@ -152,7 +152,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
http_pwd = test
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not set auth header
framework.NewRequestExpect(f).Port(vhostHTTPPort).
@@ -188,7 +188,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
custom_domains = *.example.com
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not match host
framework.NewRequestExpect(f).Port(vhostHTTPPort).
@@ -238,7 +238,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
subdomain = bar
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// foo
framework.NewRequestExpect(f).Explain("foo subdomain").Port(vhostHTTPPort).
@@ -279,7 +279,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
header_X-From-Where = frp
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not set auth header
framework.NewRequestExpect(f).Port(vhostHTTPPort).
@@ -312,7 +312,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
host_header_rewrite = rewrite.example.com
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -360,7 +360,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
custom_domains = 127.0.0.1
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
u := url.URL{Scheme: "ws", Host: "127.0.0.1:" + strconv.Itoa(vhostHTTPPort)}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)

View File

@@ -58,7 +58,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
remote_port = 11003
`, framework.UDPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// TCP
// Allowed in range
@@ -97,7 +97,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
local_port = {{ .%s }}
`, adminPort, framework.TCPEchoServerPort, framework.UDPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
client := f.APIClientForFrpc(adminPort)
@@ -138,7 +138,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
custom_domains = example.com
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("example.com")
@@ -165,7 +165,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
custom_domains = example.com
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPPath("/healthz")

View File

@@ -76,7 +76,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
custom_domains = normal.example.com
`, fooPort, barPort, otherPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// user1
framework.NewRequestExpect(f).Explain("user1").
@@ -121,7 +121,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
http_pwd = test
`, fooPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not set auth header
framework.NewRequestExpect(f).Explain("no auth").
@@ -204,7 +204,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
custom_domains = normal.example.com
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {

View File

@@ -41,7 +41,7 @@ var _ = ginkgo.Describe("[Feature: XTCP]", func() {
fallback_timeout_ms = 200
`, framework.TCPEchoServerPort, bindPortName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {
r.Timeout(time.Second)

View File

@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
bandwidth_limit = 10KB
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
content := strings.Repeat("a", 50*1024) // 5KB
start := time.Now()
@@ -89,7 +89,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
remote_port = %d
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
content := strings.Repeat("a", 50*1024) // 5KB
start := time.Now()

View File

@@ -88,7 +88,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
group_key = 123
`, fooPort, remotePort, barPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
fooCount := 0
barCount := 0
@@ -144,7 +144,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
health_check_interval_s = 1
`, fooPort, remotePort, barPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// check foo and bar is ok
results := []string{}
@@ -213,7 +213,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
health_check_url = /healthz
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// send first HTTP request
var contents []string

View File

@@ -38,7 +38,7 @@ var _ = ginkgo.Describe("[Feature: Heartbeat]", func() {
`, serverPort, f.PortByName(framework.TCPEchoServerPort), remotePort)
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure()

View File

@@ -33,7 +33,7 @@ var _ = ginkgo.Describe("[Feature: Monitor]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
time.Sleep(500 * time.Millisecond)

View File

@@ -44,7 +44,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
custom_domains = normal.example.com
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -90,7 +90,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
proxy_protocol_version = v2
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content))
@@ -136,7 +136,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
proxy_protocol_version = v2
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("normal.example.com")

View File

@@ -70,7 +70,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
framework.NewRequestExpect(f).Port(f.PortByName(test.portName)).Ensure()
@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_http_passwd = 123
`, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// http proxy, no auth info
framework.NewRequestExpect(f).PortName(framework.HTTPSimpleServerPort).RequestModify(func(r *request.Request) {
@@ -124,7 +124,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_passwd = 123
`, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// http proxy, no auth info
framework.NewRequestExpect(f).PortName(framework.TCPEchoServerPort).RequestModify(func(r *request.Request) {
@@ -168,7 +168,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_http_passwd = 123
`, remotePort, f.TempDirectory, f.TempDirectory, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// from tcp proxy
framework.NewRequestExpect(f).Request(
@@ -202,7 +202,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_local_addr = 127.0.0.1:%d
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -246,7 +246,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_key_path = %s
`, localPort, crtPath, keyPath)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),
@@ -290,7 +290,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
plugin_key_path = %s
`, localPort, crtPath, keyPath)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)

View File

@@ -71,7 +71,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort2)
f.RunProcesses([]string{serverConf}, []string{clientConf, invalidTokenClientConf})
f.RunProcesses(serverConf, []string{clientConf, invalidTokenClientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
framework.NewRequestExpect(f).Port(remotePort2).ExpectError(true).Ensure()
@@ -119,7 +119,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
@@ -153,7 +153,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = 0
`, framework.TCPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
@@ -195,7 +195,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
_, clients := f.RunProcesses([]string{serverConf}, []string{clientConf})
_, clients := f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -250,7 +250,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -297,7 +297,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -342,7 +342,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -389,7 +389,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()

View File

@@ -3,7 +3,9 @@ package process
import (
"bytes"
"context"
"errors"
"os/exec"
"sync"
)
type Process struct {
@@ -12,6 +14,11 @@ type Process struct {
errorOutput *bytes.Buffer
stdOutput *bytes.Buffer
done chan struct{}
closeOne sync.Once
waitErr error
started bool
beforeStopHandler func()
stopped bool
}
@@ -27,6 +34,7 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
p := &Process{
cmd: cmd,
cancel: cancel,
done: make(chan struct{}),
}
p.errorOutput = bytes.NewBufferString("")
p.stdOutput = bytes.NewBufferString("")
@@ -36,11 +44,35 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
}
func (p *Process) Start() error {
return p.cmd.Start()
if p.started {
return errors.New("process already started")
}
p.started = true
err := p.cmd.Start()
if err != nil {
p.waitErr = err
p.closeDone()
return err
}
go func() {
p.waitErr = p.cmd.Wait()
p.closeDone()
}()
return nil
}
func (p *Process) closeDone() {
p.closeOne.Do(func() { close(p.done) })
}
// Done returns a channel that is closed when the process exits.
func (p *Process) Done() <-chan struct{} {
return p.done
}
func (p *Process) Stop() error {
if p.stopped {
if p.stopped || !p.started {
return nil
}
defer func() {
@@ -50,7 +82,8 @@ func (p *Process) Stop() error {
p.beforeStopHandler()
}
p.cancel()
return p.cmd.Wait()
<-p.done
return p.waitErr
}
func (p *Process) ErrorOutput() string {
@@ -61,6 +94,10 @@ func (p *Process) StdOutput() string {
return p.stdOutput.String()
}
func (p *Process) Output() string {
return p.stdOutput.String() + p.errorOutput.String()
}
func (p *Process) SetBeforeStopHandler(fn func()) {
p.beforeStopHandler = fn
}

View File

@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Annotations]", func() {
"frp.e2e.test/bar" = "value2"
`, framework.TCPEchoServerPort, p1Port)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(p1Port).Ensure()

View File

@@ -83,7 +83,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
framework.NewRequestExpect(f).
@@ -154,7 +154,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
for domain := range strings.SplitSeq(test.customDomains, ",") {
@@ -240,7 +240,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -426,7 +426,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
}
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
f.RunProcesses(serverConf, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
for _, test := range tests {
timeout := time.Second
@@ -505,7 +505,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
// Request without HTTP connect should get error
framework.NewRequestExpect(f).

View File

@@ -51,7 +51,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
framework.TCPEchoServerPort, p2Port,
framework.TCPEchoServerPort, p3Port)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(p1Port).Ensure()
framework.NewRequestExpect(f).Port(p2Port).Ensure()
@@ -93,7 +93,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
webServer.password = "admin"
`, dashboardPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPPath("/healthz")
@@ -120,7 +120,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
remotePort = %d
`, adminPort, framework.TCPEchoServerPort, testPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(testPort).Ensure()

View File

@@ -78,7 +78,7 @@ func runClientServerTest(f *framework.Framework, configures *generalTestConfigur
clientConfs = append(clientConfs, client2Conf)
}
f.RunProcesses([]string{serverConf}, clientConfs)
f.RunProcesses(serverConf, clientConfs)
if configures.testDelay > 0 {
time.Sleep(configures.testDelay)

View File

@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
`, "`", "`", framework.TCPEchoServerPort, portName)
f.SetEnvs([]string{"FRP_TOKEN=123"})
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).PortName(portName).Ensure()
})
@@ -69,7 +69,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
escapeTemplate("{{- end }}"),
)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
client := f.APIClientForFrpc(adminPort)
checkProxyFn := func(name string, localPort, remotePort int) {
@@ -149,7 +149,7 @@ proxies:
remotePort: %d
`, port.GenName("Server"), framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
@@ -161,7 +161,7 @@ proxies:
"proxies": [{"name": "tcp", "type": "tcp", "localPort": {{ .%s }}, "remotePort": %d}]}`,
port.GenName("Server"), framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
})

View File

@@ -59,7 +59,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
locations = ["/bar"]
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tests := []struct {
path string
@@ -117,7 +117,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
customDomains = ["normal.example.com"]
`, fooPort, barPort, otherPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// user1
framework.NewRequestExpect(f).Explain("user1").Port(vhostHTTPPort).
@@ -159,7 +159,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
httpPassword = "test"
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not set auth header
framework.NewRequestExpect(f).Port(vhostHTTPPort).
@@ -196,7 +196,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
customDomains = ["*.example.com"]
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not match host
framework.NewRequestExpect(f).Port(vhostHTTPPort).
@@ -248,7 +248,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
subdomain = "bar"
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// foo
framework.NewRequestExpect(f).Explain("foo subdomain").Port(vhostHTTPPort).
@@ -290,7 +290,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
requestHeaders.set.x-from-where = "frp"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -323,7 +323,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
responseHeaders.set.x-from-where = "frp"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -357,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
hostHeaderRewrite = "rewrite.example.com"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -406,7 +406,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
customDomains = ["127.0.0.1"]
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
u := url.URL{Scheme: "ws", Host: "127.0.0.1:" + strconv.Itoa(vhostHTTPPort)}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
@@ -447,7 +447,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
customDomains = ["normal.example.com"]
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {

View File

@@ -67,7 +67,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
remotePort = 11003
`, framework.UDPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// TCP
// Allowed in range
@@ -108,7 +108,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
localPort = {{ .%s }}
`, adminPort, framework.TCPEchoServerPort, framework.UDPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
client := f.APIClientForFrpc(adminPort)
@@ -150,7 +150,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
customDomains = ["example.com"]
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("example.com")
@@ -178,7 +178,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
customDomains = ["example.com"]
`, framework.HTTPSimpleServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
r.HTTP().HTTPPath("/healthz")

View File

@@ -79,7 +79,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
customDomains = ["normal.example.com"]
`, fooPort, barPort, otherPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// user1
framework.NewRequestExpect(f).Explain("user1").
@@ -125,7 +125,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
httpPassword = "test"
`, fooPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// not set auth header
framework.NewRequestExpect(f).Explain("no auth").
@@ -209,7 +209,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
customDomains = ["normal.example.com"]
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {

View File

@@ -16,8 +16,11 @@ package basic
import (
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"time"
"github.com/onsi/ginkgo/v2"
@@ -73,7 +76,7 @@ localPort = {{ .%s }}
remotePort = {{ .%s }}
`, tokenContent, framework.TCPEchoServerPort, portName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).PortName(portName).Ensure()
})
@@ -109,7 +112,7 @@ localPort = {{ .%s }}
remotePort = {{ .%s }}
`, tokenFile, framework.TCPEchoServerPort, portName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).PortName(portName).Ensure()
})
@@ -150,7 +153,7 @@ localPort = {{ .%s }}
remotePort = {{ .%s }}
`, clientTokenFile, framework.TCPEchoServerPort, portName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).PortName(portName).Ensure()
})
@@ -190,7 +193,7 @@ localPort = {{ .%s }}
remotePort = {{ .%s }}
`, clientTokenFile, framework.TCPEchoServerPort, portName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// This should fail due to token mismatch - the client should not be able to connect
// We expect the request to fail because the proxy tunnel is not established
@@ -198,32 +201,27 @@ remotePort = {{ .%s }}
})
ginkgo.It("should fail with non-existent token file", func() {
// This test verifies that server fails to start when tokenSource points to non-existent file
// We'll verify this by checking that the configuration loading itself fails
// Create a config that references a non-existent file
tmpDir := f.TempDirectory
nonExistentFile := filepath.Join(tmpDir, "non_existent_token")
serverConf := consts.DefaultServerConfig
// Server config with non-existent tokenSource file
serverConf += fmt.Sprintf(`
serverPort := f.AllocPort()
serverConf := fmt.Sprintf(`
bindAddr = "0.0.0.0"
bindPort = %d
auth.tokenSource.type = "file"
auth.tokenSource.file.path = "%s"
`, nonExistentFile)
`, serverPort, nonExistentFile)
// The test expectation is that this will fail during the RunProcesses call
// because the server cannot load the configuration due to missing token file
defer func() {
if r := recover(); r != nil {
// Expected: server should fail to start due to missing file
ginkgo.By(fmt.Sprintf("Server correctly failed to start: %v", r))
}
}()
serverConfigPath := f.GenerateConfigFile(serverConf)
// This should cause a panic or error during server startup
f.RunProcesses([]string{serverConf}, []string{})
_, _, _ = f.RunFrps("-c", serverConfigPath)
// Server should have failed to start, so the port should not be listening.
conn, err := net.DialTimeout("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(serverPort)), 1*time.Second)
if err == nil {
conn.Close()
}
framework.ExpectTrue(err != nil, "server should not be listening on port %d", serverPort)
})
})

View File

@@ -42,7 +42,7 @@ var _ = ginkgo.Describe("[Feature: XTCP]", func() {
fallbackTimeoutMs = 200
`, framework.TCPEchoServerPort, bindPortName)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).
RequestModify(func(r *request.Request) {
r.Timeout(time.Second)

View File

@@ -36,7 +36,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
transport.bandwidthLimit = "10KB"
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
content := strings.Repeat("a", 50*1024) // 5KB
start := time.Now()
@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
remotePort = %d
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
content := strings.Repeat("a", 50*1024) // 5KB
start := time.Now()

View File

@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
loadBalancer.groupKey = "123"
`, fooPort, remotePort, barPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
fooCount := 0
barCount := 0
@@ -157,7 +157,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
loadBalancer.groupKey = "123"
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
fooCount := 0
barCount := 0
@@ -186,6 +186,68 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
})
ginkgo.It("TCPMux httpconnect", func() {
vhostPort := f.AllocPort()
serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
tcpmuxHTTPConnectPort = %d
`, vhostPort)
clientConf := consts.DefaultClientConfig
fooPort := f.AllocPort()
fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
f.RunServer("", fooServer)
barPort := f.AllocPort()
barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
f.RunServer("", barServer)
clientConf += fmt.Sprintf(`
[[proxies]]
name = "foo"
type = "tcpmux"
multiplexer = "httpconnect"
localPort = %d
customDomains = ["tcpmux-group.example.com"]
loadBalancer.group = "test"
loadBalancer.groupKey = "123"
[[proxies]]
name = "bar"
type = "tcpmux"
multiplexer = "httpconnect"
localPort = %d
customDomains = ["tcpmux-group.example.com"]
loadBalancer.group = "test"
loadBalancer.groupKey = "123"
`, fooPort, barPort)
f.RunProcesses(serverConf, []string{clientConf})
proxyURL := fmt.Sprintf("http://127.0.0.1:%d", vhostPort)
fooCount := 0
barCount := 0
for i := range 10 {
framework.NewRequestExpect(f).
Explain("times " + strconv.Itoa(i)).
RequestModify(func(r *request.Request) {
r.Addr("tcpmux-group.example.com").Proxy(proxyURL)
}).
Ensure(func(resp *request.Response) bool {
switch string(resp.Content) {
case "foo":
fooCount++
case "bar":
barCount++
default:
return false
}
return true
})
}
framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
})
})
ginkgo.Describe("Health Check", func() {
@@ -224,7 +286,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
healthCheck.intervalSeconds = 1
`, fooPort, remotePort, barPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// check foo and bar is ok
results := []string{}
@@ -295,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
healthCheck.path = "/healthz"
`, fooPort, barPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// send first HTTP request
var contents []string

View File

@@ -37,7 +37,7 @@ var _ = ginkgo.Describe("[Feature: Heartbeat]", func() {
`, serverPort, f.PortByName(framework.TCPEchoServerPort), remotePort)
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure()

View File

@@ -34,7 +34,7 @@ var _ = ginkgo.Describe("[Feature: Monitor]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
time.Sleep(500 * time.Millisecond)

View File

@@ -48,7 +48,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
customDomains = ["normal.example.com"]
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -82,7 +82,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
customDomains = ["normal.example.com"]
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).
RequestModify(func(r *request.Request) {
@@ -112,7 +112,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
localAddr = "127.0.0.1:%d"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -154,7 +154,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
localAddr = "127.0.0.1:%d"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),
@@ -212,7 +212,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
transport.proxyProtocolVersion = "v2"
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content))
@@ -262,7 +262,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
transport.proxyProtocolVersion = "v2"
`, localPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Protocol("udp").Port(remotePort).Ensure(func(resp *request.Response) bool {
log.Tracef("udp proxy protocol get SourceAddr: %s", string(resp.Content))
@@ -309,7 +309,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
transport.proxyProtocolVersion = "v2"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(vhostHTTPPort).RequestModify(func(r *request.Request) {
r.HTTP().HTTPHost("normal.example.com")

View File

@@ -3,6 +3,8 @@ package features
import (
"crypto/tls"
"fmt"
"net"
"strconv"
"time"
"github.com/onsi/ginkgo/v2"
@@ -25,7 +27,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
sshTunnelGateway.bindPort = %d
`, sshPort)
f.RunProcesses([]string{serverConf}, nil)
f.RunProcesses(serverConf, nil)
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
localPort := f.PortByName(framework.TCPEchoServerPort)
remotePort := f.AllocPort()
@@ -49,7 +52,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
sshTunnelGateway.bindPort = %d
`, vhostPort, sshPort)
f.RunProcesses([]string{serverConf}, nil)
f.RunProcesses(serverConf, nil)
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
localPort := f.PortByName(framework.HTTPSimpleServerPort)
tc := ssh.NewTunnelClient(
@@ -76,7 +80,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
sshTunnelGateway.bindPort = %d
`, vhostPort, sshPort)
f.RunProcesses([]string{serverConf}, nil)
f.RunProcesses(serverConf, nil)
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
localPort := f.AllocPort()
testDomain := "test.example.com"
@@ -118,7 +123,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
sshTunnelGateway.bindPort = %d
`, tcpmuxPort, sshPort)
f.RunProcesses([]string{serverConf}, nil)
f.RunProcesses(serverConf, nil)
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
localPort := f.AllocPort()
testDomain := "test.example.com"
@@ -173,7 +179,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
bindPort = %d
`, bindPort)
f.RunProcesses([]string{serverConf}, []string{visitorConf})
f.RunProcesses(serverConf, []string{visitorConf})
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
localPort := f.PortByName(framework.TCPEchoServerPort)
tc := ssh.NewTunnelClient(

View File

@@ -30,7 +30,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
proxyConfig := map[string]any{
@@ -71,7 +71,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
proxyConfig := map[string]any{
@@ -125,7 +125,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
proxyConfig := map[string]any{
@@ -173,7 +173,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
proxyConfig := map[string]any{
@@ -225,7 +225,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
webServer.port = %d
`, adminPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
@@ -247,7 +247,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
invalidBody, _ := json.Marshal(map[string]any{
@@ -280,7 +280,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
path = "%s/store.json"
`, adminPort, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
time.Sleep(500 * time.Millisecond)
createBody, _ := json.Marshal(map[string]any{

View File

@@ -74,7 +74,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
}
// run frps and frpc
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
f.RunProcesses(serverConf, []string{clientConf.String()})
for _, test := range tests {
framework.NewRequestExpect(f).Port(f.PortByName(test.portName)).Ensure()
@@ -98,7 +98,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
httpPassword = "123"
`, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// http proxy, no auth info
framework.NewRequestExpect(f).PortName(framework.HTTPSimpleServerPort).RequestModify(func(r *request.Request) {
@@ -132,7 +132,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
password = "123"
`, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// http proxy, no auth info
framework.NewRequestExpect(f).PortName(framework.TCPEchoServerPort).RequestModify(func(r *request.Request) {
@@ -182,7 +182,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
httpPassword = "123"
`, remotePort, f.TempDirectory, f.TempDirectory, f.TempDirectory)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
// from tcp proxy
framework.NewRequestExpect(f).Request(
@@ -218,7 +218,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
localAddr = "127.0.0.1:%d"
`, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -264,7 +264,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
keyPath = "%s"
`, localPort, crtPath, keyPath)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),
@@ -310,7 +310,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
keyPath = "%s"
`, localPort, crtPath, keyPath)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
framework.ExpectNoError(err)
@@ -350,7 +350,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
hostHeaderRewrite = "rewrite.test.com"
`, remotePort, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),
@@ -385,7 +385,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
requestHeaders.set.x-from-where = "frp"
`, remotePort, localPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),
@@ -431,7 +431,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
keyPath = "%s"
`, localPort, crtPath, keyPath)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
localServer := httpserver.New(
httpserver.WithBindPort(localPort),

View File

@@ -74,7 +74,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort2)
f.RunProcesses([]string{serverConf}, []string{clientConf, invalidTokenClientConf})
f.RunProcesses(serverConf, []string{clientConf, invalidTokenClientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
framework.NewRequestExpect(f).Port(remotePort2).ExpectError(true).Ensure()
@@ -124,7 +124,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
@@ -160,7 +160,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = 0
`, framework.TCPEchoServerPort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
})
@@ -204,7 +204,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
_, clients := f.RunProcesses([]string{serverConf}, []string{clientConf})
_, clients := f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -261,7 +261,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -310,7 +310,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -357,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()
@@ -406,7 +406,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)
f.RunProcesses([]string{serverConf}, []string{clientConf})
f.RunProcesses(serverConf, []string{clientConf})
framework.NewRequestExpect(f).Port(remotePort).Ensure()