mirror of
https://github.com/fatedier/frp.git
synced 2026-04-05 08:39:17 +08:00
Compare commits
4 Commits
9eafcc8a95
...
8d1ab7d585
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d1ab7d585 | ||
|
|
f32bec9f4d | ||
|
|
bcd2424c24 | ||
|
|
c7ac12ea0f |
77
server/group/base.go
Normal file
77
server/group/base.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
gerr "github.com/fatedier/golib/errors"
|
||||
)
|
||||
|
||||
// baseGroup contains the shared plumbing for listener-based groups
|
||||
// (TCP, HTTPS, TCPMux). Each concrete group embeds this and provides
|
||||
// its own Listen method with protocol-specific validation.
|
||||
type baseGroup struct {
|
||||
group string
|
||||
groupKey string
|
||||
|
||||
acceptCh chan net.Conn
|
||||
realLn net.Listener
|
||||
lns []*Listener
|
||||
mu sync.Mutex
|
||||
cleanupFn func()
|
||||
}
|
||||
|
||||
// initBase resets the baseGroup for a fresh listen cycle.
|
||||
// Must be called under mu when len(lns) == 0.
|
||||
func (bg *baseGroup) initBase(group, groupKey string, realLn net.Listener, cleanupFn func()) {
|
||||
bg.group = group
|
||||
bg.groupKey = groupKey
|
||||
bg.realLn = realLn
|
||||
bg.acceptCh = make(chan net.Conn)
|
||||
bg.cleanupFn = cleanupFn
|
||||
}
|
||||
|
||||
// worker reads from the real listener and fans out to acceptCh.
|
||||
// The parameters are captured at creation time so that the worker is
|
||||
// bound to a specific listen cycle and cannot observe a later initBase.
|
||||
func (bg *baseGroup) worker(realLn net.Listener, acceptCh chan<- net.Conn) {
|
||||
for {
|
||||
c, err := realLn.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = gerr.PanicToError(func() {
|
||||
acceptCh <- c
|
||||
})
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// newListener creates a new Listener wired to this baseGroup.
|
||||
// Must be called under mu.
|
||||
func (bg *baseGroup) newListener(addr net.Addr) *Listener {
|
||||
ln := newListener(bg.acceptCh, addr, bg.closeListener)
|
||||
bg.lns = append(bg.lns, ln)
|
||||
return ln
|
||||
}
|
||||
|
||||
// closeListener removes ln from the list. When the last listener is removed,
|
||||
// it closes acceptCh, closes the real listener, and calls cleanupFn.
|
||||
func (bg *baseGroup) closeListener(ln *Listener) {
|
||||
bg.mu.Lock()
|
||||
defer bg.mu.Unlock()
|
||||
for i, l := range bg.lns {
|
||||
if l == ln {
|
||||
bg.lns = append(bg.lns[:i], bg.lns[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(bg.lns) == 0 {
|
||||
close(bg.acceptCh)
|
||||
bg.realLn.Close()
|
||||
bg.cleanupFn()
|
||||
}
|
||||
}
|
||||
169
server/group/base_test.go
Normal file
169
server/group/base_test.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// fakeLn is a controllable net.Listener for tests.
|
||||
type fakeLn struct {
|
||||
connCh chan net.Conn
|
||||
closed chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func newFakeLn() *fakeLn {
|
||||
return &fakeLn{
|
||||
connCh: make(chan net.Conn, 8),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeLn) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case c := <-f.connCh:
|
||||
return c, nil
|
||||
case <-f.closed:
|
||||
return nil, net.ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeLn) Close() error {
|
||||
f.once.Do(func() { close(f.closed) })
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeLn) Addr() net.Addr { return fakeAddr("127.0.0.1:9999") }
|
||||
|
||||
func (f *fakeLn) inject(c net.Conn) {
|
||||
select {
|
||||
case f.connCh <- c:
|
||||
case <-f.closed:
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerFanOut(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
select {
|
||||
case got := <-bg.acceptCh:
|
||||
assert.Equal(t, c1, got)
|
||||
got.Close()
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for connection on acceptCh")
|
||||
}
|
||||
|
||||
fl.Close()
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerStopsOnListenerClose(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
bg.worker(fl, bg.acceptCh)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
fl.Close()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("worker did not stop after listener close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBaseGroup_WorkerClosesConnOnClosedChannel(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
bg.initBase("g", "key", fl, func() {})
|
||||
|
||||
// Close acceptCh before worker sends.
|
||||
close(bg.acceptCh)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
bg.worker(fl, bg.acceptCh)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("worker did not stop after panic recovery")
|
||||
}
|
||||
|
||||
// c1 should have been closed by worker's panic recovery path.
|
||||
buf := make([]byte, 1)
|
||||
_, err := c1.Read(buf)
|
||||
assert.Error(t, err, "connection should be closed by worker")
|
||||
}
|
||||
|
||||
func TestBaseGroup_CloseLastListenerTriggersCleanup(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
cleanupCalled := 0
|
||||
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
|
||||
|
||||
bg.mu.Lock()
|
||||
ln1 := bg.newListener(fl.Addr())
|
||||
ln2 := bg.newListener(fl.Addr())
|
||||
bg.mu.Unlock()
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
ln1.Close()
|
||||
assert.Equal(t, 0, cleanupCalled, "cleanup should not run while listeners remain")
|
||||
|
||||
ln2.Close()
|
||||
assert.Equal(t, 1, cleanupCalled, "cleanup should run after last listener closed")
|
||||
}
|
||||
|
||||
func TestBaseGroup_CloseOneOfTwoListeners(t *testing.T) {
|
||||
fl := newFakeLn()
|
||||
var bg baseGroup
|
||||
cleanupCalled := 0
|
||||
bg.initBase("g", "key", fl, func() { cleanupCalled++ })
|
||||
|
||||
bg.mu.Lock()
|
||||
ln1 := bg.newListener(fl.Addr())
|
||||
ln2 := bg.newListener(fl.Addr())
|
||||
bg.mu.Unlock()
|
||||
|
||||
go bg.worker(fl, bg.acceptCh)
|
||||
|
||||
ln1.Close()
|
||||
assert.Equal(t, 0, cleanupCalled)
|
||||
|
||||
// ln2 should still receive connections.
|
||||
c1, c2 := net.Pipe()
|
||||
defer c2.Close()
|
||||
fl.inject(c1)
|
||||
|
||||
got, err := ln2.Accept()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, c1, got)
|
||||
got.Close()
|
||||
|
||||
ln2.Close()
|
||||
assert.Equal(t, 1, cleanupCalled)
|
||||
}
|
||||
@@ -24,4 +24,6 @@ var (
|
||||
ErrListenerClosed = errors.New("group listener closed")
|
||||
ErrGroupDifferentPort = errors.New("group should have same remote port")
|
||||
ErrProxyRepeated = errors.New("group proxy repeated")
|
||||
|
||||
errGroupStale = errors.New("stale group reference")
|
||||
)
|
||||
|
||||
@@ -9,53 +9,42 @@ import (
|
||||
"github.com/fatedier/frp/pkg/util/vhost"
|
||||
)
|
||||
|
||||
// HTTPGroupController manages HTTP groups that use round-robin
|
||||
// callback routing (fundamentally different from listener-based groups).
|
||||
type HTTPGroupController struct {
|
||||
// groups indexed by group name
|
||||
groups map[string]*HTTPGroup
|
||||
|
||||
// register createConn for each group to vhostRouter.
|
||||
// createConn will get a connection from one proxy of the group
|
||||
groupRegistry[*HTTPGroup]
|
||||
vhostRouter *vhost.Routers
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewHTTPGroupController(vhostRouter *vhost.Routers) *HTTPGroupController {
|
||||
return &HTTPGroupController{
|
||||
groups: make(map[string]*HTTPGroup),
|
||||
vhostRouter: vhostRouter,
|
||||
groupRegistry: newGroupRegistry[*HTTPGroup](),
|
||||
vhostRouter: vhostRouter,
|
||||
}
|
||||
}
|
||||
|
||||
func (ctl *HTTPGroupController) Register(
|
||||
proxyName, group, groupKey string,
|
||||
routeConfig vhost.RouteConfig,
|
||||
) (err error) {
|
||||
indexKey := group
|
||||
ctl.mu.Lock()
|
||||
g, ok := ctl.groups[indexKey]
|
||||
if !ok {
|
||||
g = NewHTTPGroup(ctl)
|
||||
ctl.groups[indexKey] = g
|
||||
) error {
|
||||
for {
|
||||
g := ctl.getOrCreate(group, func() *HTTPGroup {
|
||||
return NewHTTPGroup(ctl)
|
||||
})
|
||||
err := g.Register(proxyName, group, groupKey, routeConfig)
|
||||
if err == errGroupStale {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
ctl.mu.Unlock()
|
||||
|
||||
return g.Register(proxyName, group, groupKey, routeConfig)
|
||||
}
|
||||
|
||||
func (ctl *HTTPGroupController) UnRegister(proxyName, group string, _ vhost.RouteConfig) {
|
||||
indexKey := group
|
||||
ctl.mu.Lock()
|
||||
defer ctl.mu.Unlock()
|
||||
g, ok := ctl.groups[indexKey]
|
||||
g, ok := ctl.get(group)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
isEmpty := g.UnRegister(proxyName)
|
||||
if isEmpty {
|
||||
delete(ctl.groups, indexKey)
|
||||
}
|
||||
g.UnRegister(proxyName)
|
||||
}
|
||||
|
||||
type HTTPGroup struct {
|
||||
@@ -87,6 +76,9 @@ func (g *HTTPGroup) Register(
|
||||
) (err error) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
if !g.ctl.isCurrent(group, func(cur *HTTPGroup) bool { return cur == g }) {
|
||||
return errGroupStale
|
||||
}
|
||||
if len(g.createFuncs) == 0 {
|
||||
// the first proxy in this group
|
||||
tmp := routeConfig // copy object
|
||||
@@ -123,7 +115,7 @@ func (g *HTTPGroup) Register(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
|
||||
func (g *HTTPGroup) UnRegister(proxyName string) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
delete(g.createFuncs, proxyName)
|
||||
@@ -135,10 +127,11 @@ func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
|
||||
}
|
||||
|
||||
if len(g.createFuncs) == 0 {
|
||||
isEmpty = true
|
||||
g.ctl.vhostRouter.Del(g.domain, g.location, g.routeByHTTPUser)
|
||||
g.ctl.removeIf(g.group, func(cur *HTTPGroup) bool {
|
||||
return cur == g
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
|
||||
@@ -151,7 +144,7 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
|
||||
location := g.location
|
||||
routeByHTTPUser := g.routeByHTTPUser
|
||||
if len(g.pxyNames) > 0 {
|
||||
name := g.pxyNames[int(newIndex)%len(g.pxyNames)]
|
||||
name := g.pxyNames[newIndex%uint64(len(g.pxyNames))]
|
||||
f = g.createFuncs[name]
|
||||
}
|
||||
g.mu.RUnlock()
|
||||
@@ -174,7 +167,7 @@ func (g *HTTPGroup) chooseEndpoint() (string, error) {
|
||||
location := g.location
|
||||
routeByHTTPUser := g.routeByHTTPUser
|
||||
if len(g.pxyNames) > 0 {
|
||||
name = g.pxyNames[int(newIndex)%len(g.pxyNames)]
|
||||
name = g.pxyNames[newIndex%uint64(len(g.pxyNames))]
|
||||
}
|
||||
g.mu.RUnlock()
|
||||
|
||||
|
||||
@@ -17,25 +17,19 @@ package group
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
gerr "github.com/fatedier/golib/errors"
|
||||
|
||||
"github.com/fatedier/frp/pkg/util/vhost"
|
||||
)
|
||||
|
||||
type HTTPSGroupController struct {
|
||||
groups map[string]*HTTPSGroup
|
||||
|
||||
groupRegistry[*HTTPSGroup]
|
||||
httpsMuxer *vhost.HTTPSMuxer
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewHTTPSGroupController(httpsMuxer *vhost.HTTPSMuxer) *HTTPSGroupController {
|
||||
return &HTTPSGroupController{
|
||||
groups: make(map[string]*HTTPSGroup),
|
||||
httpsMuxer: httpsMuxer,
|
||||
groupRegistry: newGroupRegistry[*HTTPSGroup](),
|
||||
httpsMuxer: httpsMuxer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,41 +38,28 @@ func (ctl *HTTPSGroupController) Listen(
|
||||
group, groupKey string,
|
||||
routeConfig vhost.RouteConfig,
|
||||
) (l net.Listener, err error) {
|
||||
indexKey := group
|
||||
ctl.mu.Lock()
|
||||
g, ok := ctl.groups[indexKey]
|
||||
if !ok {
|
||||
g = NewHTTPSGroup(ctl)
|
||||
ctl.groups[indexKey] = g
|
||||
for {
|
||||
g := ctl.getOrCreate(group, func() *HTTPSGroup {
|
||||
return NewHTTPSGroup(ctl)
|
||||
})
|
||||
l, err = g.Listen(ctx, group, groupKey, routeConfig)
|
||||
if err == errGroupStale {
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
ctl.mu.Unlock()
|
||||
|
||||
return g.Listen(ctx, group, groupKey, routeConfig)
|
||||
}
|
||||
|
||||
func (ctl *HTTPSGroupController) RemoveGroup(group string) {
|
||||
ctl.mu.Lock()
|
||||
defer ctl.mu.Unlock()
|
||||
delete(ctl.groups, group)
|
||||
}
|
||||
|
||||
type HTTPSGroup struct {
|
||||
group string
|
||||
groupKey string
|
||||
domain string
|
||||
baseGroup
|
||||
|
||||
acceptCh chan net.Conn
|
||||
httpsLn *vhost.Listener
|
||||
lns []*HTTPSGroupListener
|
||||
ctl *HTTPSGroupController
|
||||
mu sync.Mutex
|
||||
domain string
|
||||
ctl *HTTPSGroupController
|
||||
}
|
||||
|
||||
func NewHTTPSGroup(ctl *HTTPSGroupController) *HTTPSGroup {
|
||||
return &HTTPSGroup{
|
||||
lns: make([]*HTTPSGroupListener, 0),
|
||||
ctl: ctl,
|
||||
acceptCh: make(chan net.Conn),
|
||||
ctl: ctl,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,23 +67,27 @@ func (g *HTTPSGroup) Listen(
|
||||
ctx context.Context,
|
||||
group, groupKey string,
|
||||
routeConfig vhost.RouteConfig,
|
||||
) (ln *HTTPSGroupListener, err error) {
|
||||
) (ln *Listener, err error) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
if !g.ctl.isCurrent(group, func(cur *HTTPSGroup) bool { return cur == g }) {
|
||||
return nil, errGroupStale
|
||||
}
|
||||
if len(g.lns) == 0 {
|
||||
// the first listener, listen on the real address
|
||||
httpsLn, errRet := g.ctl.httpsMuxer.Listen(ctx, &routeConfig)
|
||||
if errRet != nil {
|
||||
return nil, errRet
|
||||
}
|
||||
ln = newHTTPSGroupListener(group, g, httpsLn.Addr())
|
||||
|
||||
g.group = group
|
||||
g.groupKey = groupKey
|
||||
g.domain = routeConfig.Domain
|
||||
g.httpsLn = httpsLn
|
||||
g.lns = append(g.lns, ln)
|
||||
go g.worker()
|
||||
g.initBase(group, groupKey, httpsLn, func() {
|
||||
g.ctl.removeIf(g.group, func(cur *HTTPSGroup) bool {
|
||||
return cur == g
|
||||
})
|
||||
})
|
||||
ln = g.newListener(httpsLn.Addr())
|
||||
go g.worker(httpsLn, g.acceptCh)
|
||||
} else {
|
||||
// route config in the same group must be equal
|
||||
if g.group != group || g.domain != routeConfig.Domain {
|
||||
@@ -111,87 +96,7 @@ func (g *HTTPSGroup) Listen(
|
||||
if g.groupKey != groupKey {
|
||||
return nil, ErrGroupAuthFailed
|
||||
}
|
||||
ln = newHTTPSGroupListener(group, g, g.lns[0].Addr())
|
||||
g.lns = append(g.lns, ln)
|
||||
ln = g.newListener(g.lns[0].Addr())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (g *HTTPSGroup) worker() {
|
||||
for {
|
||||
c, err := g.httpsLn.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = gerr.PanicToError(func() {
|
||||
g.acceptCh <- c
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (g *HTTPSGroup) Accept() <-chan net.Conn {
|
||||
return g.acceptCh
|
||||
}
|
||||
|
||||
func (g *HTTPSGroup) CloseListener(ln *HTTPSGroupListener) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
for i, tmpLn := range g.lns {
|
||||
if tmpLn == ln {
|
||||
g.lns = append(g.lns[:i], g.lns[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(g.lns) == 0 {
|
||||
close(g.acceptCh)
|
||||
if g.httpsLn != nil {
|
||||
g.httpsLn.Close()
|
||||
}
|
||||
g.ctl.RemoveGroup(g.group)
|
||||
}
|
||||
}
|
||||
|
||||
type HTTPSGroupListener struct {
|
||||
groupName string
|
||||
group *HTTPSGroup
|
||||
|
||||
addr net.Addr
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func newHTTPSGroupListener(name string, group *HTTPSGroup, addr net.Addr) *HTTPSGroupListener {
|
||||
return &HTTPSGroupListener{
|
||||
groupName: name,
|
||||
group: group,
|
||||
addr: addr,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *HTTPSGroupListener) Accept() (c net.Conn, err error) {
|
||||
var ok bool
|
||||
select {
|
||||
case <-ln.closeCh:
|
||||
return nil, ErrListenerClosed
|
||||
case c, ok = <-ln.group.Accept():
|
||||
if !ok {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *HTTPSGroupListener) Addr() net.Addr {
|
||||
return ln.addr
|
||||
}
|
||||
|
||||
func (ln *HTTPSGroupListener) Close() (err error) {
|
||||
close(ln.closeCh)
|
||||
|
||||
// remove self from HTTPSGroup
|
||||
ln.group.CloseListener(ln)
|
||||
return
|
||||
}
|
||||
|
||||
49
server/group/listener.go
Normal file
49
server/group/listener.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Listener is a per-proxy virtual listener that receives connections
|
||||
// from a shared group. It implements net.Listener.
|
||||
type Listener struct {
|
||||
acceptCh <-chan net.Conn
|
||||
addr net.Addr
|
||||
closeCh chan struct{}
|
||||
onClose func(*Listener)
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func newListener(acceptCh <-chan net.Conn, addr net.Addr, onClose func(*Listener)) *Listener {
|
||||
return &Listener{
|
||||
acceptCh: acceptCh,
|
||||
addr: addr,
|
||||
closeCh: make(chan struct{}),
|
||||
onClose: onClose,
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *Listener) Accept() (net.Conn, error) {
|
||||
select {
|
||||
case <-ln.closeCh:
|
||||
return nil, ErrListenerClosed
|
||||
case c, ok := <-ln.acceptCh:
|
||||
if !ok {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *Listener) Addr() net.Addr {
|
||||
return ln.addr
|
||||
}
|
||||
|
||||
func (ln *Listener) Close() error {
|
||||
ln.once.Do(func() {
|
||||
close(ln.closeCh)
|
||||
ln.onClose(ln)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
68
server/group/listener_test.go
Normal file
68
server/group/listener_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestListener_Accept(t *testing.T) {
|
||||
acceptCh := make(chan net.Conn, 1)
|
||||
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
defer c1.Close()
|
||||
defer c2.Close()
|
||||
|
||||
acceptCh <- c1
|
||||
got, err := ln.Accept()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, c1, got)
|
||||
}
|
||||
|
||||
func TestListener_AcceptAfterChannelClose(t *testing.T) {
|
||||
acceptCh := make(chan net.Conn)
|
||||
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
|
||||
|
||||
close(acceptCh)
|
||||
_, err := ln.Accept()
|
||||
assert.ErrorIs(t, err, ErrListenerClosed)
|
||||
}
|
||||
|
||||
func TestListener_AcceptAfterListenerClose(t *testing.T) {
|
||||
acceptCh := make(chan net.Conn) // open, not closed
|
||||
ln := newListener(acceptCh, fakeAddr("127.0.0.1:1234"), func(*Listener) {})
|
||||
|
||||
ln.Close()
|
||||
_, err := ln.Accept()
|
||||
assert.ErrorIs(t, err, ErrListenerClosed)
|
||||
}
|
||||
|
||||
func TestListener_DoubleClose(t *testing.T) {
|
||||
closeCalls := 0
|
||||
ln := newListener(
|
||||
make(chan net.Conn),
|
||||
fakeAddr("127.0.0.1:1234"),
|
||||
func(*Listener) { closeCalls++ },
|
||||
)
|
||||
|
||||
assert.NotPanics(t, func() {
|
||||
ln.Close()
|
||||
ln.Close()
|
||||
})
|
||||
assert.Equal(t, 1, closeCalls, "onClose should be called exactly once")
|
||||
}
|
||||
|
||||
func TestListener_Addr(t *testing.T) {
|
||||
addr := fakeAddr("10.0.0.1:5555")
|
||||
ln := newListener(make(chan net.Conn), addr, func(*Listener) {})
|
||||
assert.Equal(t, addr, ln.Addr())
|
||||
}
|
||||
|
||||
// fakeAddr implements net.Addr for testing.
|
||||
type fakeAddr string
|
||||
|
||||
func (a fakeAddr) Network() string { return "tcp" }
|
||||
func (a fakeAddr) String() string { return string(a) }
|
||||
59
server/group/registry.go
Normal file
59
server/group/registry.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// groupRegistry is a concurrent map of named groups with
|
||||
// automatic creation on first access.
|
||||
type groupRegistry[G any] struct {
|
||||
groups map[string]G
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newGroupRegistry[G any]() groupRegistry[G] {
|
||||
return groupRegistry[G]{
|
||||
groups: make(map[string]G),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *groupRegistry[G]) getOrCreate(key string, newFn func() G) G {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
g, ok := r.groups[key]
|
||||
if !ok {
|
||||
g = newFn()
|
||||
r.groups[key] = g
|
||||
}
|
||||
return g
|
||||
}
|
||||
|
||||
func (r *groupRegistry[G]) get(key string) (G, bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
g, ok := r.groups[key]
|
||||
return g, ok
|
||||
}
|
||||
|
||||
// isCurrent returns true if key exists in the registry and matchFn
|
||||
// returns true for the stored value.
|
||||
func (r *groupRegistry[G]) isCurrent(key string, matchFn func(G) bool) bool {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
g, ok := r.groups[key]
|
||||
return ok && matchFn(g)
|
||||
}
|
||||
|
||||
// removeIf atomically looks up the group for key, calls fn on it,
|
||||
// and removes the entry if fn returns true.
|
||||
func (r *groupRegistry[G]) removeIf(key string, fn func(G) bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
g, ok := r.groups[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if fn(g) {
|
||||
delete(r.groups, key)
|
||||
}
|
||||
}
|
||||
102
server/group/registry_test.go
Normal file
102
server/group/registry_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package group
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetOrCreate_New(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
called := 0
|
||||
v := 42
|
||||
got := r.getOrCreate("k", func() *int { called++; return &v })
|
||||
assert.Equal(t, 1, called)
|
||||
assert.Equal(t, &v, got)
|
||||
}
|
||||
|
||||
func TestGetOrCreate_Existing(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
v := 42
|
||||
r.getOrCreate("k", func() *int { return &v })
|
||||
|
||||
called := 0
|
||||
got := r.getOrCreate("k", func() *int { called++; return nil })
|
||||
assert.Equal(t, 0, called)
|
||||
assert.Equal(t, &v, got)
|
||||
}
|
||||
|
||||
func TestGet_ExistingAndMissing(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
v := 1
|
||||
r.getOrCreate("k", func() *int { return &v })
|
||||
|
||||
got, ok := r.get("k")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, &v, got)
|
||||
|
||||
_, ok = r.get("missing")
|
||||
assert.False(t, ok)
|
||||
}
|
||||
|
||||
func TestIsCurrent(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
v1 := 1
|
||||
v2 := 2
|
||||
r.getOrCreate("k", func() *int { return &v1 })
|
||||
|
||||
assert.True(t, r.isCurrent("k", func(g *int) bool { return g == &v1 }))
|
||||
assert.False(t, r.isCurrent("k", func(g *int) bool { return g == &v2 }))
|
||||
assert.False(t, r.isCurrent("missing", func(g *int) bool { return true }))
|
||||
}
|
||||
|
||||
func TestRemoveIf(t *testing.T) {
|
||||
t.Run("removes when fn returns true", func(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
v := 1
|
||||
r.getOrCreate("k", func() *int { return &v })
|
||||
r.removeIf("k", func(g *int) bool { return g == &v })
|
||||
_, ok := r.get("k")
|
||||
assert.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("keeps when fn returns false", func(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
v := 1
|
||||
r.getOrCreate("k", func() *int { return &v })
|
||||
r.removeIf("k", func(g *int) bool { return false })
|
||||
_, ok := r.get("k")
|
||||
assert.True(t, ok)
|
||||
})
|
||||
|
||||
t.Run("noop on missing key", func(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
r.removeIf("missing", func(g *int) bool { return true }) // should not panic
|
||||
})
|
||||
}
|
||||
|
||||
func TestConcurrentGetOrCreateAndRemoveIf(t *testing.T) {
|
||||
r := newGroupRegistry[*int]()
|
||||
const n = 100
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n * 2)
|
||||
for i := range n {
|
||||
v := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
r.getOrCreate("k", func() *int { return &v })
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
r.removeIf("k", func(*int) bool { return true })
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// After all goroutines finish, accessing the key must not panic.
|
||||
require.NotPanics(t, func() {
|
||||
_, _ = r.get("k")
|
||||
})
|
||||
}
|
||||
@@ -17,83 +17,67 @@ package group
|
||||
import (
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
gerr "github.com/fatedier/golib/errors"
|
||||
|
||||
"github.com/fatedier/frp/server/ports"
|
||||
)
|
||||
|
||||
// TCPGroupCtl manage all TCPGroups
|
||||
// TCPGroupCtl manages all TCPGroups.
|
||||
type TCPGroupCtl struct {
|
||||
groups map[string]*TCPGroup
|
||||
|
||||
// portManager is used to manage port
|
||||
groupRegistry[*TCPGroup]
|
||||
portManager *ports.Manager
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewTCPGroupCtl return a new TcpGroupCtl
|
||||
// NewTCPGroupCtl returns a new TCPGroupCtl.
|
||||
func NewTCPGroupCtl(portManager *ports.Manager) *TCPGroupCtl {
|
||||
return &TCPGroupCtl{
|
||||
groups: make(map[string]*TCPGroup),
|
||||
portManager: portManager,
|
||||
groupRegistry: newGroupRegistry[*TCPGroup](),
|
||||
portManager: portManager,
|
||||
}
|
||||
}
|
||||
|
||||
// Listen is the wrapper for TCPGroup's Listen
|
||||
// If there are no group, we will create one here
|
||||
// Listen is the wrapper for TCPGroup's Listen.
|
||||
// If there is no group, one will be created.
|
||||
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string,
|
||||
addr string, port int,
|
||||
) (l net.Listener, realPort int, err error) {
|
||||
tgc.mu.Lock()
|
||||
tcpGroup, ok := tgc.groups[group]
|
||||
if !ok {
|
||||
tcpGroup = NewTCPGroup(tgc)
|
||||
tgc.groups[group] = tcpGroup
|
||||
for {
|
||||
tcpGroup := tgc.getOrCreate(group, func() *TCPGroup {
|
||||
return NewTCPGroup(tgc)
|
||||
})
|
||||
l, realPort, err = tcpGroup.Listen(proxyName, group, groupKey, addr, port)
|
||||
if err == errGroupStale {
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
tgc.mu.Unlock()
|
||||
|
||||
return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
|
||||
}
|
||||
|
||||
// RemoveGroup remove TCPGroup from controller
|
||||
func (tgc *TCPGroupCtl) RemoveGroup(group string) {
|
||||
tgc.mu.Lock()
|
||||
defer tgc.mu.Unlock()
|
||||
delete(tgc.groups, group)
|
||||
}
|
||||
|
||||
// TCPGroup route connections to different proxies
|
||||
// TCPGroup routes connections to different proxies.
|
||||
type TCPGroup struct {
|
||||
group string
|
||||
groupKey string
|
||||
baseGroup
|
||||
|
||||
addr string
|
||||
port int
|
||||
realPort int
|
||||
|
||||
acceptCh chan net.Conn
|
||||
tcpLn net.Listener
|
||||
lns []*TCPGroupListener
|
||||
ctl *TCPGroupCtl
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewTCPGroup return a new TCPGroup
|
||||
// NewTCPGroup returns a new TCPGroup.
|
||||
func NewTCPGroup(ctl *TCPGroupCtl) *TCPGroup {
|
||||
return &TCPGroup{
|
||||
lns: make([]*TCPGroupListener, 0),
|
||||
ctl: ctl,
|
||||
acceptCh: make(chan net.Conn),
|
||||
ctl: ctl,
|
||||
}
|
||||
}
|
||||
|
||||
// Listen will return a new TCPGroupListener
|
||||
// if TCPGroup already has a listener, just add a new TCPGroupListener to the queues
|
||||
// otherwise, listen on the real address
|
||||
func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr string, port int) (ln *TCPGroupListener, realPort int, err error) {
|
||||
// Listen will return a new Listener.
|
||||
// If TCPGroup already has a listener, just add a new Listener to the queues,
|
||||
// otherwise listen on the real address.
|
||||
func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr string, port int) (ln *Listener, realPort int, err error) {
|
||||
tg.mu.Lock()
|
||||
defer tg.mu.Unlock()
|
||||
if !tg.ctl.isCurrent(group, func(cur *TCPGroup) bool { return cur == tg }) {
|
||||
return nil, 0, errGroupStale
|
||||
}
|
||||
if len(tg.lns) == 0 {
|
||||
// the first listener, listen on the real address
|
||||
realPort, err = tg.ctl.portManager.Acquire(proxyName, port)
|
||||
@@ -106,19 +90,18 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
|
||||
err = errRet
|
||||
return
|
||||
}
|
||||
ln = newTCPGroupListener(group, tg, tcpLn.Addr())
|
||||
|
||||
tg.group = group
|
||||
tg.groupKey = groupKey
|
||||
tg.addr = addr
|
||||
tg.port = port
|
||||
tg.realPort = realPort
|
||||
tg.tcpLn = tcpLn
|
||||
tg.lns = append(tg.lns, ln)
|
||||
if tg.acceptCh == nil {
|
||||
tg.acceptCh = make(chan net.Conn)
|
||||
}
|
||||
go tg.worker()
|
||||
tg.initBase(group, groupKey, tcpLn, func() {
|
||||
tg.ctl.portManager.Release(tg.realPort)
|
||||
tg.ctl.removeIf(tg.group, func(cur *TCPGroup) bool {
|
||||
return cur == tg
|
||||
})
|
||||
})
|
||||
ln = tg.newListener(tcpLn.Addr())
|
||||
go tg.worker(tcpLn, tg.acceptCh)
|
||||
} else {
|
||||
// address and port in the same group must be equal
|
||||
if tg.group != group || tg.addr != addr {
|
||||
@@ -133,92 +116,8 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
|
||||
err = ErrGroupAuthFailed
|
||||
return
|
||||
}
|
||||
ln = newTCPGroupListener(group, tg, tg.lns[0].Addr())
|
||||
ln = tg.newListener(tg.lns[0].Addr())
|
||||
realPort = tg.realPort
|
||||
tg.lns = append(tg.lns, ln)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// worker is called when the real tcp listener has been created
|
||||
func (tg *TCPGroup) worker() {
|
||||
for {
|
||||
c, err := tg.tcpLn.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = gerr.PanicToError(func() {
|
||||
tg.acceptCh <- c
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tg *TCPGroup) Accept() <-chan net.Conn {
|
||||
return tg.acceptCh
|
||||
}
|
||||
|
||||
// CloseListener remove the TCPGroupListener from the TCPGroup
|
||||
func (tg *TCPGroup) CloseListener(ln *TCPGroupListener) {
|
||||
tg.mu.Lock()
|
||||
defer tg.mu.Unlock()
|
||||
for i, tmpLn := range tg.lns {
|
||||
if tmpLn == ln {
|
||||
tg.lns = append(tg.lns[:i], tg.lns[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(tg.lns) == 0 {
|
||||
close(tg.acceptCh)
|
||||
tg.tcpLn.Close()
|
||||
tg.ctl.portManager.Release(tg.realPort)
|
||||
tg.ctl.RemoveGroup(tg.group)
|
||||
}
|
||||
}
|
||||
|
||||
// TCPGroupListener
|
||||
type TCPGroupListener struct {
|
||||
groupName string
|
||||
group *TCPGroup
|
||||
|
||||
addr net.Addr
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func newTCPGroupListener(name string, group *TCPGroup, addr net.Addr) *TCPGroupListener {
|
||||
return &TCPGroupListener{
|
||||
groupName: name,
|
||||
group: group,
|
||||
addr: addr,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Accept will accept connections from TCPGroup
|
||||
func (ln *TCPGroupListener) Accept() (c net.Conn, err error) {
|
||||
var ok bool
|
||||
select {
|
||||
case <-ln.closeCh:
|
||||
return nil, ErrListenerClosed
|
||||
case c, ok = <-ln.group.Accept():
|
||||
if !ok {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *TCPGroupListener) Addr() net.Addr {
|
||||
return ln.addr
|
||||
}
|
||||
|
||||
// Close close the listener
|
||||
func (ln *TCPGroupListener) Close() (err error) {
|
||||
close(ln.closeCh)
|
||||
|
||||
// remove self from TcpGroup
|
||||
ln.group.CloseListener(ln)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -18,118 +18,100 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
gerr "github.com/fatedier/golib/errors"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/util/tcpmux"
|
||||
"github.com/fatedier/frp/pkg/util/vhost"
|
||||
)
|
||||
|
||||
// TCPMuxGroupCtl manage all TCPMuxGroups
|
||||
// TCPMuxGroupCtl manages all TCPMuxGroups.
|
||||
type TCPMuxGroupCtl struct {
|
||||
groups map[string]*TCPMuxGroup
|
||||
|
||||
// portManager is used to manage port
|
||||
groupRegistry[*TCPMuxGroup]
|
||||
tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewTCPMuxGroupCtl return a new TCPMuxGroupCtl
|
||||
// NewTCPMuxGroupCtl returns a new TCPMuxGroupCtl.
|
||||
func NewTCPMuxGroupCtl(tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer) *TCPMuxGroupCtl {
|
||||
return &TCPMuxGroupCtl{
|
||||
groups: make(map[string]*TCPMuxGroup),
|
||||
groupRegistry: newGroupRegistry[*TCPMuxGroup](),
|
||||
tcpMuxHTTPConnectMuxer: tcpMuxHTTPConnectMuxer,
|
||||
}
|
||||
}
|
||||
|
||||
// Listen is the wrapper for TCPMuxGroup's Listen
|
||||
// If there are no group, we will create one here
|
||||
// Listen is the wrapper for TCPMuxGroup's Listen.
|
||||
// If there is no group, one will be created.
|
||||
func (tmgc *TCPMuxGroupCtl) Listen(
|
||||
ctx context.Context,
|
||||
multiplexer, group, groupKey string,
|
||||
routeConfig vhost.RouteConfig,
|
||||
) (l net.Listener, err error) {
|
||||
tmgc.mu.Lock()
|
||||
tcpMuxGroup, ok := tmgc.groups[group]
|
||||
if !ok {
|
||||
tcpMuxGroup = NewTCPMuxGroup(tmgc)
|
||||
tmgc.groups[group] = tcpMuxGroup
|
||||
}
|
||||
tmgc.mu.Unlock()
|
||||
for {
|
||||
tcpMuxGroup := tmgc.getOrCreate(group, func() *TCPMuxGroup {
|
||||
return NewTCPMuxGroup(tmgc)
|
||||
})
|
||||
|
||||
switch v1.TCPMultiplexerType(multiplexer) {
|
||||
case v1.TCPMultiplexerHTTPConnect:
|
||||
return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, routeConfig)
|
||||
default:
|
||||
err = fmt.Errorf("unknown multiplexer [%s]", multiplexer)
|
||||
return
|
||||
switch v1.TCPMultiplexerType(multiplexer) {
|
||||
case v1.TCPMultiplexerHTTPConnect:
|
||||
l, err = tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, routeConfig)
|
||||
if err == errGroupStale {
|
||||
continue
|
||||
}
|
||||
return
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown multiplexer [%s]", multiplexer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveGroup remove TCPMuxGroup from controller
|
||||
func (tmgc *TCPMuxGroupCtl) RemoveGroup(group string) {
|
||||
tmgc.mu.Lock()
|
||||
defer tmgc.mu.Unlock()
|
||||
delete(tmgc.groups, group)
|
||||
}
|
||||
|
||||
// TCPMuxGroup route connections to different proxies
|
||||
// TCPMuxGroup routes connections to different proxies.
|
||||
type TCPMuxGroup struct {
|
||||
group string
|
||||
groupKey string
|
||||
baseGroup
|
||||
|
||||
domain string
|
||||
routeByHTTPUser string
|
||||
username string
|
||||
password string
|
||||
|
||||
acceptCh chan net.Conn
|
||||
tcpMuxLn net.Listener
|
||||
lns []*TCPMuxGroupListener
|
||||
ctl *TCPMuxGroupCtl
|
||||
mu sync.Mutex
|
||||
ctl *TCPMuxGroupCtl
|
||||
}
|
||||
|
||||
// NewTCPMuxGroup return a new TCPMuxGroup
|
||||
// NewTCPMuxGroup returns a new TCPMuxGroup.
|
||||
func NewTCPMuxGroup(ctl *TCPMuxGroupCtl) *TCPMuxGroup {
|
||||
return &TCPMuxGroup{
|
||||
lns: make([]*TCPMuxGroupListener, 0),
|
||||
ctl: ctl,
|
||||
acceptCh: make(chan net.Conn),
|
||||
ctl: ctl,
|
||||
}
|
||||
}
|
||||
|
||||
// Listen will return a new TCPMuxGroupListener
|
||||
// if TCPMuxGroup already has a listener, just add a new TCPMuxGroupListener to the queues
|
||||
// otherwise, listen on the real address
|
||||
// HTTPConnectListen will return a new Listener.
|
||||
// If TCPMuxGroup already has a listener, just add a new Listener to the queues,
|
||||
// otherwise listen on the real address.
|
||||
func (tmg *TCPMuxGroup) HTTPConnectListen(
|
||||
ctx context.Context,
|
||||
group, groupKey string,
|
||||
routeConfig vhost.RouteConfig,
|
||||
) (ln *TCPMuxGroupListener, err error) {
|
||||
) (ln *Listener, err error) {
|
||||
tmg.mu.Lock()
|
||||
defer tmg.mu.Unlock()
|
||||
if !tmg.ctl.isCurrent(group, func(cur *TCPMuxGroup) bool { return cur == tmg }) {
|
||||
return nil, errGroupStale
|
||||
}
|
||||
if len(tmg.lns) == 0 {
|
||||
// the first listener, listen on the real address
|
||||
tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, &routeConfig)
|
||||
if errRet != nil {
|
||||
return nil, errRet
|
||||
}
|
||||
ln = newTCPMuxGroupListener(group, tmg, tcpMuxLn.Addr())
|
||||
|
||||
tmg.group = group
|
||||
tmg.groupKey = groupKey
|
||||
tmg.domain = routeConfig.Domain
|
||||
tmg.routeByHTTPUser = routeConfig.RouteByHTTPUser
|
||||
tmg.username = routeConfig.Username
|
||||
tmg.password = routeConfig.Password
|
||||
tmg.tcpMuxLn = tcpMuxLn
|
||||
tmg.lns = append(tmg.lns, ln)
|
||||
if tmg.acceptCh == nil {
|
||||
tmg.acceptCh = make(chan net.Conn)
|
||||
}
|
||||
go tmg.worker()
|
||||
tmg.initBase(group, groupKey, tcpMuxLn, func() {
|
||||
tmg.ctl.removeIf(tmg.group, func(cur *TCPMuxGroup) bool {
|
||||
return cur == tmg
|
||||
})
|
||||
})
|
||||
ln = tmg.newListener(tcpMuxLn.Addr())
|
||||
go tmg.worker(tcpMuxLn, tmg.acceptCh)
|
||||
} else {
|
||||
// route config in the same group must be equal
|
||||
if tmg.group != group || tmg.domain != routeConfig.Domain ||
|
||||
@@ -141,90 +123,7 @@ func (tmg *TCPMuxGroup) HTTPConnectListen(
|
||||
if tmg.groupKey != groupKey {
|
||||
return nil, ErrGroupAuthFailed
|
||||
}
|
||||
ln = newTCPMuxGroupListener(group, tmg, tmg.lns[0].Addr())
|
||||
tmg.lns = append(tmg.lns, ln)
|
||||
ln = tmg.newListener(tmg.lns[0].Addr())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// worker is called when the real TCP listener has been created
|
||||
func (tmg *TCPMuxGroup) worker() {
|
||||
for {
|
||||
c, err := tmg.tcpMuxLn.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = gerr.PanicToError(func() {
|
||||
tmg.acceptCh <- c
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tmg *TCPMuxGroup) Accept() <-chan net.Conn {
|
||||
return tmg.acceptCh
|
||||
}
|
||||
|
||||
// CloseListener remove the TCPMuxGroupListener from the TCPMuxGroup
|
||||
func (tmg *TCPMuxGroup) CloseListener(ln *TCPMuxGroupListener) {
|
||||
tmg.mu.Lock()
|
||||
defer tmg.mu.Unlock()
|
||||
for i, tmpLn := range tmg.lns {
|
||||
if tmpLn == ln {
|
||||
tmg.lns = append(tmg.lns[:i], tmg.lns[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(tmg.lns) == 0 {
|
||||
close(tmg.acceptCh)
|
||||
tmg.tcpMuxLn.Close()
|
||||
tmg.ctl.RemoveGroup(tmg.group)
|
||||
}
|
||||
}
|
||||
|
||||
// TCPMuxGroupListener
|
||||
type TCPMuxGroupListener struct {
|
||||
groupName string
|
||||
group *TCPMuxGroup
|
||||
|
||||
addr net.Addr
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func newTCPMuxGroupListener(name string, group *TCPMuxGroup, addr net.Addr) *TCPMuxGroupListener {
|
||||
return &TCPMuxGroupListener{
|
||||
groupName: name,
|
||||
group: group,
|
||||
addr: addr,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Accept will accept connections from TCPMuxGroup
|
||||
func (ln *TCPMuxGroupListener) Accept() (c net.Conn, err error) {
|
||||
var ok bool
|
||||
select {
|
||||
case <-ln.closeCh:
|
||||
return nil, ErrListenerClosed
|
||||
case c, ok = <-ln.group.Accept():
|
||||
if !ok {
|
||||
return nil, ErrListenerClosed
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ln *TCPMuxGroupListener) Addr() net.Addr {
|
||||
return ln.addr
|
||||
}
|
||||
|
||||
// Close close the listener
|
||||
func (ln *TCPMuxGroupListener) Close() (err error) {
|
||||
close(ln.closeCh)
|
||||
|
||||
// remove self from TcpMuxGroup
|
||||
ln.group.CloseListener(ln)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ var _ = ginkgo.Describe("[Feature: Example]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
|
||||
@@ -3,67 +3,70 @@ package framework
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
flog "github.com/fatedier/frp/pkg/util/log"
|
||||
"github.com/fatedier/frp/test/e2e/framework/consts"
|
||||
"github.com/fatedier/frp/test/e2e/pkg/process"
|
||||
)
|
||||
|
||||
// RunProcesses run multiple processes from templates.
|
||||
// The first template should always be frps.
|
||||
func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []string) ([]*process.Process, []*process.Process) {
|
||||
templates := slices.Concat(serverTemplates, clientTemplates)
|
||||
// RunProcesses starts one frps and zero or more frpc processes from templates.
|
||||
func (f *Framework) RunProcesses(serverTemplate string, clientTemplates []string) (*process.Process, []*process.Process) {
|
||||
templates := append([]string{serverTemplate}, clientTemplates...)
|
||||
outs, ports, err := f.RenderTemplates(templates)
|
||||
ExpectNoError(err)
|
||||
ExpectTrue(len(templates) > 0)
|
||||
|
||||
maps.Copy(f.usedPorts, ports)
|
||||
|
||||
currentServerProcesses := make([]*process.Process, 0, len(serverTemplates))
|
||||
for i := range serverTemplates {
|
||||
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-server-%d", i))
|
||||
err = os.WriteFile(path, []byte(outs[i]), 0o600)
|
||||
ExpectNoError(err)
|
||||
// Start frps.
|
||||
serverPath := filepath.Join(f.TempDirectory, "frp-e2e-server-0")
|
||||
err = os.WriteFile(serverPath, []byte(outs[0]), 0o600)
|
||||
ExpectNoError(err)
|
||||
|
||||
if TestContext.Debug {
|
||||
flog.Debugf("[%s] %s", path, outs[i])
|
||||
}
|
||||
|
||||
p := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", path}, f.osEnvs)
|
||||
f.serverConfPaths = append(f.serverConfPaths, path)
|
||||
f.serverProcesses = append(f.serverProcesses, p)
|
||||
currentServerProcesses = append(currentServerProcesses, p)
|
||||
err = p.Start()
|
||||
ExpectNoError(err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
if TestContext.Debug {
|
||||
flog.Debugf("[%s] %s", serverPath, outs[0])
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
currentClientProcesses := make([]*process.Process, 0, len(clientTemplates))
|
||||
serverProcess := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", serverPath}, f.osEnvs)
|
||||
f.serverConfPaths = append(f.serverConfPaths, serverPath)
|
||||
f.serverProcesses = append(f.serverProcesses, serverProcess)
|
||||
err = serverProcess.Start()
|
||||
ExpectNoError(err)
|
||||
|
||||
if port, ok := ports[consts.PortServerName]; ok {
|
||||
ExpectNoError(WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(port)), 5*time.Second))
|
||||
} else {
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
// Start frpc(s).
|
||||
clientProcesses := make([]*process.Process, 0, len(clientTemplates))
|
||||
for i := range clientTemplates {
|
||||
index := i + len(serverTemplates)
|
||||
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-client-%d", i))
|
||||
err = os.WriteFile(path, []byte(outs[index]), 0o600)
|
||||
err = os.WriteFile(path, []byte(outs[1+i]), 0o600)
|
||||
ExpectNoError(err)
|
||||
|
||||
if TestContext.Debug {
|
||||
flog.Debugf("[%s] %s", path, outs[index])
|
||||
flog.Debugf("[%s] %s", path, outs[1+i])
|
||||
}
|
||||
|
||||
p := process.NewWithEnvs(TestContext.FRPClientPath, []string{"-c", path}, f.osEnvs)
|
||||
f.clientConfPaths = append(f.clientConfPaths, path)
|
||||
f.clientProcesses = append(f.clientProcesses, p)
|
||||
currentClientProcesses = append(currentClientProcesses, p)
|
||||
clientProcesses = append(clientProcesses, p)
|
||||
err = p.Start()
|
||||
ExpectNoError(err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
// frpc needs time to connect and register proxies with frps.
|
||||
if len(clientProcesses) > 0 {
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
}
|
||||
|
||||
return currentServerProcesses, currentClientProcesses
|
||||
return serverProcess, clientProcesses
|
||||
}
|
||||
|
||||
func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) {
|
||||
@@ -71,11 +74,13 @@ func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) {
|
||||
f.serverProcesses = append(f.serverProcesses, p)
|
||||
err := p.Start()
|
||||
if err != nil {
|
||||
return p, p.StdOutput(), err
|
||||
return p, p.Output(), err
|
||||
}
|
||||
// Give frps extra time to finish binding ports before proceeding.
|
||||
time.Sleep(4 * time.Second)
|
||||
return p, p.StdOutput(), nil
|
||||
select {
|
||||
case <-p.Done():
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
return p, p.Output(), nil
|
||||
}
|
||||
|
||||
func (f *Framework) RunFrpc(args ...string) (*process.Process, string, error) {
|
||||
@@ -83,10 +88,13 @@ func (f *Framework) RunFrpc(args ...string) (*process.Process, string, error) {
|
||||
f.clientProcesses = append(f.clientProcesses, p)
|
||||
err := p.Start()
|
||||
if err != nil {
|
||||
return p, p.StdOutput(), err
|
||||
return p, p.Output(), err
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
return p, p.StdOutput(), nil
|
||||
select {
|
||||
case <-p.Done():
|
||||
case <-time.After(1500 * time.Millisecond):
|
||||
}
|
||||
return p, p.Output(), nil
|
||||
}
|
||||
|
||||
func (f *Framework) GenerateConfigFile(content string) string {
|
||||
@@ -96,3 +104,25 @@ func (f *Framework) GenerateConfigFile(content string) string {
|
||||
ExpectNoError(err)
|
||||
return path
|
||||
}
|
||||
|
||||
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
|
||||
func WaitForTCPReady(addr string, timeout time.Duration) error {
|
||||
if timeout <= 0 {
|
||||
return fmt.Errorf("invalid timeout for TCP readiness on %s: timeout must be positive", addr)
|
||||
}
|
||||
deadline := time.Now().Add(timeout)
|
||||
var lastErr error
|
||||
for time.Now().Before(deadline) {
|
||||
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
if lastErr == nil {
|
||||
return fmt.Errorf("timeout waiting for TCP readiness on %s before any dial attempt", addr)
|
||||
}
|
||||
return fmt.Errorf("timeout waiting for TCP readiness on %s: %w", addr, lastErr)
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
framework.NewRequestExpect(f).
|
||||
@@ -152,7 +152,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
for domain := range strings.SplitSeq(test.customDomains, ",") {
|
||||
@@ -235,7 +235,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -419,7 +419,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
}
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
timeout := time.Second
|
||||
@@ -497,7 +497,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
}
|
||||
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
// Request without HTTP connect should get error
|
||||
framework.NewRequestExpect(f).
|
||||
|
||||
@@ -48,7 +48,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
framework.TCPEchoServerPort, p2Port,
|
||||
framework.TCPEchoServerPort, p3Port)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(p1Port).Ensure()
|
||||
framework.NewRequestExpect(f).Port(p2Port).Ensure()
|
||||
@@ -90,7 +90,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
admin_pwd = admin
|
||||
`, dashboardPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPPath("/healthz")
|
||||
@@ -116,7 +116,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
remote_port = %d
|
||||
`, adminPort, framework.TCPEchoServerPort, testPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(testPort).Ensure()
|
||||
|
||||
|
||||
@@ -76,7 +76,7 @@ func runClientServerTest(f *framework.Framework, configures *generalTestConfigur
|
||||
clientConfs = append(clientConfs, client2Conf)
|
||||
}
|
||||
|
||||
f.RunProcesses([]string{serverConf}, clientConfs)
|
||||
f.RunProcesses(serverConf, clientConfs)
|
||||
|
||||
if configures.testDelay > 0 {
|
||||
time.Sleep(configures.testDelay)
|
||||
|
||||
@@ -33,7 +33,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
|
||||
`, "`", "`", framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.SetEnvs([]string{"FRP_TOKEN=123"})
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).PortName(portName).Ensure()
|
||||
})
|
||||
|
||||
@@ -56,7 +56,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
locations = /bar
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tests := []struct {
|
||||
path string
|
||||
@@ -111,7 +111,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
custom_domains = normal.example.com
|
||||
`, fooPort, barPort, otherPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// user1
|
||||
framework.NewRequestExpect(f).Explain("user1").Port(vhostHTTPPort).
|
||||
@@ -152,7 +152,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
http_pwd = test
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not set auth header
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
@@ -188,7 +188,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
custom_domains = *.example.com
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not match host
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
@@ -238,7 +238,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
subdomain = bar
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// foo
|
||||
framework.NewRequestExpect(f).Explain("foo subdomain").Port(vhostHTTPPort).
|
||||
@@ -279,7 +279,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
header_X-From-Where = frp
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not set auth header
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
@@ -312,7 +312,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
host_header_rewrite = rewrite.example.com
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -360,7 +360,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
custom_domains = 127.0.0.1
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
u := url.URL{Scheme: "ws", Host: "127.0.0.1:" + strconv.Itoa(vhostHTTPPort)}
|
||||
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||
|
||||
@@ -58,7 +58,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
remote_port = 11003
|
||||
`, framework.UDPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// TCP
|
||||
// Allowed in range
|
||||
@@ -97,7 +97,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
local_port = {{ .%s }}
|
||||
`, adminPort, framework.TCPEchoServerPort, framework.UDPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
client := f.APIClientForFrpc(adminPort)
|
||||
|
||||
@@ -138,7 +138,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
custom_domains = example.com
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPHost("example.com")
|
||||
@@ -165,7 +165,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
custom_domains = example.com
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPPath("/healthz")
|
||||
|
||||
@@ -76,7 +76,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
custom_domains = normal.example.com
|
||||
`, fooPort, barPort, otherPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// user1
|
||||
framework.NewRequestExpect(f).Explain("user1").
|
||||
@@ -121,7 +121,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
http_pwd = test
|
||||
`, fooPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not set auth header
|
||||
framework.NewRequestExpect(f).Explain("no auth").
|
||||
@@ -204,7 +204,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
custom_domains = normal.example.com
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).
|
||||
RequestModify(func(r *request.Request) {
|
||||
|
||||
@@ -41,7 +41,7 @@ var _ = ginkgo.Describe("[Feature: XTCP]", func() {
|
||||
fallback_timeout_ms = 200
|
||||
`, framework.TCPEchoServerPort, bindPortName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
framework.NewRequestExpect(f).
|
||||
RequestModify(func(r *request.Request) {
|
||||
r.Timeout(time.Second)
|
||||
|
||||
@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
|
||||
bandwidth_limit = 10KB
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
content := strings.Repeat("a", 50*1024) // 5KB
|
||||
start := time.Now()
|
||||
@@ -89,7 +89,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
|
||||
remote_port = %d
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
content := strings.Repeat("a", 50*1024) // 5KB
|
||||
start := time.Now()
|
||||
|
||||
@@ -88,7 +88,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
group_key = 123
|
||||
`, fooPort, remotePort, barPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
fooCount := 0
|
||||
barCount := 0
|
||||
@@ -144,7 +144,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
health_check_interval_s = 1
|
||||
`, fooPort, remotePort, barPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// check foo and bar is ok
|
||||
results := []string{}
|
||||
@@ -213,7 +213,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
health_check_url = /healthz
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// send first HTTP request
|
||||
var contents []string
|
||||
|
||||
@@ -38,7 +38,7 @@ var _ = ginkgo.Describe("[Feature: Heartbeat]", func() {
|
||||
`, serverPort, f.PortByName(framework.TCPEchoServerPort), remotePort)
|
||||
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure()
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ var _ = ginkgo.Describe("[Feature: Monitor]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
@@ -44,7 +44,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
custom_domains = normal.example.com
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -90,7 +90,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
proxy_protocol_version = v2
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
|
||||
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content))
|
||||
@@ -136,7 +136,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
proxy_protocol_version = v2
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPHost("normal.example.com")
|
||||
|
||||
@@ -70,7 +70,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
framework.NewRequestExpect(f).Port(f.PortByName(test.portName)).Ensure()
|
||||
@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_http_passwd = 123
|
||||
`, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// http proxy, no auth info
|
||||
framework.NewRequestExpect(f).PortName(framework.HTTPSimpleServerPort).RequestModify(func(r *request.Request) {
|
||||
@@ -124,7 +124,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_passwd = 123
|
||||
`, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// http proxy, no auth info
|
||||
framework.NewRequestExpect(f).PortName(framework.TCPEchoServerPort).RequestModify(func(r *request.Request) {
|
||||
@@ -168,7 +168,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_http_passwd = 123
|
||||
`, remotePort, f.TempDirectory, f.TempDirectory, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// from tcp proxy
|
||||
framework.NewRequestExpect(f).Request(
|
||||
@@ -202,7 +202,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_local_addr = 127.0.0.1:%d
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -246,7 +246,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_key_path = %s
|
||||
`, localPort, crtPath, keyPath)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
@@ -290,7 +290,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
plugin_key_path = %s
|
||||
`, localPort, crtPath, keyPath)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
@@ -71,7 +71,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort2)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf, invalidTokenClientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf, invalidTokenClientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
framework.NewRequestExpect(f).Port(remotePort2).ExpectError(true).Ensure()
|
||||
@@ -119,7 +119,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
@@ -153,7 +153,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = 0
|
||||
`, framework.TCPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
@@ -195,7 +195,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
_, clients := f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
_, clients := f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -250,7 +250,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -297,7 +297,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -342,7 +342,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -389,7 +389,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remote_port = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
|
||||
@@ -3,7 +3,9 @@ package process
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Process struct {
|
||||
@@ -12,6 +14,11 @@ type Process struct {
|
||||
errorOutput *bytes.Buffer
|
||||
stdOutput *bytes.Buffer
|
||||
|
||||
done chan struct{}
|
||||
closeOne sync.Once
|
||||
waitErr error
|
||||
|
||||
started bool
|
||||
beforeStopHandler func()
|
||||
stopped bool
|
||||
}
|
||||
@@ -27,6 +34,7 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
|
||||
p := &Process{
|
||||
cmd: cmd,
|
||||
cancel: cancel,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
p.errorOutput = bytes.NewBufferString("")
|
||||
p.stdOutput = bytes.NewBufferString("")
|
||||
@@ -36,11 +44,35 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
|
||||
}
|
||||
|
||||
func (p *Process) Start() error {
|
||||
return p.cmd.Start()
|
||||
if p.started {
|
||||
return errors.New("process already started")
|
||||
}
|
||||
p.started = true
|
||||
|
||||
err := p.cmd.Start()
|
||||
if err != nil {
|
||||
p.waitErr = err
|
||||
p.closeDone()
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
p.waitErr = p.cmd.Wait()
|
||||
p.closeDone()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Process) closeDone() {
|
||||
p.closeOne.Do(func() { close(p.done) })
|
||||
}
|
||||
|
||||
// Done returns a channel that is closed when the process exits.
|
||||
func (p *Process) Done() <-chan struct{} {
|
||||
return p.done
|
||||
}
|
||||
|
||||
func (p *Process) Stop() error {
|
||||
if p.stopped {
|
||||
if p.stopped || !p.started {
|
||||
return nil
|
||||
}
|
||||
defer func() {
|
||||
@@ -50,7 +82,8 @@ func (p *Process) Stop() error {
|
||||
p.beforeStopHandler()
|
||||
}
|
||||
p.cancel()
|
||||
return p.cmd.Wait()
|
||||
<-p.done
|
||||
return p.waitErr
|
||||
}
|
||||
|
||||
func (p *Process) ErrorOutput() string {
|
||||
@@ -61,6 +94,10 @@ func (p *Process) StdOutput() string {
|
||||
return p.stdOutput.String()
|
||||
}
|
||||
|
||||
func (p *Process) Output() string {
|
||||
return p.stdOutput.String() + p.errorOutput.String()
|
||||
}
|
||||
|
||||
func (p *Process) SetBeforeStopHandler(fn func()) {
|
||||
p.beforeStopHandler = fn
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Annotations]", func() {
|
||||
"frp.e2e.test/bar" = "value2"
|
||||
`, framework.TCPEchoServerPort, p1Port)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(p1Port).Ensure()
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
framework.NewRequestExpect(f).
|
||||
@@ -154,7 +154,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
for domain := range strings.SplitSeq(test.customDomains, ",") {
|
||||
@@ -240,7 +240,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, tests[i].customDomains, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -426,7 +426,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
}
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientServerConf.String(), clientVisitorConf.String(), clientUser2VisitorConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
timeout := time.Second
|
||||
@@ -505,7 +505,7 @@ var _ = ginkgo.Describe("[Feature: Basic]", func() {
|
||||
}
|
||||
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
// Request without HTTP connect should get error
|
||||
framework.NewRequestExpect(f).
|
||||
|
||||
@@ -51,7 +51,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
framework.TCPEchoServerPort, p2Port,
|
||||
framework.TCPEchoServerPort, p3Port)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(p1Port).Ensure()
|
||||
framework.NewRequestExpect(f).Port(p2Port).Ensure()
|
||||
@@ -93,7 +93,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
webServer.password = "admin"
|
||||
`, dashboardPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPPath("/healthz")
|
||||
@@ -120,7 +120,7 @@ var _ = ginkgo.Describe("[Feature: ClientManage]", func() {
|
||||
remotePort = %d
|
||||
`, adminPort, framework.TCPEchoServerPort, testPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(testPort).Ensure()
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ func runClientServerTest(f *framework.Framework, configures *generalTestConfigur
|
||||
clientConfs = append(clientConfs, client2Conf)
|
||||
}
|
||||
|
||||
f.RunProcesses([]string{serverConf}, clientConfs)
|
||||
f.RunProcesses(serverConf, clientConfs)
|
||||
|
||||
if configures.testDelay > 0 {
|
||||
time.Sleep(configures.testDelay)
|
||||
|
||||
@@ -35,7 +35,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
|
||||
`, "`", "`", framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.SetEnvs([]string{"FRP_TOKEN=123"})
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).PortName(portName).Ensure()
|
||||
})
|
||||
@@ -69,7 +69,7 @@ var _ = ginkgo.Describe("[Feature: Config]", func() {
|
||||
escapeTemplate("{{- end }}"),
|
||||
)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
client := f.APIClientForFrpc(adminPort)
|
||||
checkProxyFn := func(name string, localPort, remotePort int) {
|
||||
@@ -149,7 +149,7 @@ proxies:
|
||||
remotePort: %d
|
||||
`, port.GenName("Server"), framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
|
||||
@@ -161,7 +161,7 @@ proxies:
|
||||
"proxies": [{"name": "tcp", "type": "tcp", "localPort": {{ .%s }}, "remotePort": %d}]}`,
|
||||
port.GenName("Server"), framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -59,7 +59,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
locations = ["/bar"]
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tests := []struct {
|
||||
path string
|
||||
@@ -117,7 +117,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, fooPort, barPort, otherPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// user1
|
||||
framework.NewRequestExpect(f).Explain("user1").Port(vhostHTTPPort).
|
||||
@@ -159,7 +159,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
httpPassword = "test"
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not set auth header
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
@@ -196,7 +196,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
customDomains = ["*.example.com"]
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not match host
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
@@ -248,7 +248,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
subdomain = "bar"
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// foo
|
||||
framework.NewRequestExpect(f).Explain("foo subdomain").Port(vhostHTTPPort).
|
||||
@@ -290,7 +290,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
requestHeaders.set.x-from-where = "frp"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -323,7 +323,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
responseHeaders.set.x-from-where = "frp"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -357,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
hostHeaderRewrite = "rewrite.example.com"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -406,7 +406,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
customDomains = ["127.0.0.1"]
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
u := url.URL{Scheme: "ws", Host: "127.0.0.1:" + strconv.Itoa(vhostHTTPPort)}
|
||||
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||
@@ -447,7 +447,7 @@ var _ = ginkgo.Describe("[Feature: HTTP]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
|
||||
@@ -67,7 +67,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
remotePort = 11003
|
||||
`, framework.UDPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// TCP
|
||||
// Allowed in range
|
||||
@@ -108,7 +108,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
localPort = {{ .%s }}
|
||||
`, adminPort, framework.TCPEchoServerPort, framework.UDPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
client := f.APIClientForFrpc(adminPort)
|
||||
|
||||
@@ -150,7 +150,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
customDomains = ["example.com"]
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPHost("example.com")
|
||||
@@ -178,7 +178,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
customDomains = ["example.com"]
|
||||
`, framework.HTTPSimpleServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPPath("/healthz")
|
||||
|
||||
@@ -79,7 +79,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, fooPort, barPort, otherPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// user1
|
||||
framework.NewRequestExpect(f).Explain("user1").
|
||||
@@ -125,7 +125,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
httpPassword = "test"
|
||||
`, fooPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// not set auth header
|
||||
framework.NewRequestExpect(f).Explain("no auth").
|
||||
@@ -209,7 +209,7 @@ var _ = ginkgo.Describe("[Feature: TCPMUX httpconnect]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).
|
||||
RequestModify(func(r *request.Request) {
|
||||
|
||||
@@ -16,8 +16,11 @@ package basic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
|
||||
@@ -73,7 +76,7 @@ localPort = {{ .%s }}
|
||||
remotePort = {{ .%s }}
|
||||
`, tokenContent, framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).PortName(portName).Ensure()
|
||||
})
|
||||
@@ -109,7 +112,7 @@ localPort = {{ .%s }}
|
||||
remotePort = {{ .%s }}
|
||||
`, tokenFile, framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).PortName(portName).Ensure()
|
||||
})
|
||||
@@ -150,7 +153,7 @@ localPort = {{ .%s }}
|
||||
remotePort = {{ .%s }}
|
||||
`, clientTokenFile, framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).PortName(portName).Ensure()
|
||||
})
|
||||
@@ -190,7 +193,7 @@ localPort = {{ .%s }}
|
||||
remotePort = {{ .%s }}
|
||||
`, clientTokenFile, framework.TCPEchoServerPort, portName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// This should fail due to token mismatch - the client should not be able to connect
|
||||
// We expect the request to fail because the proxy tunnel is not established
|
||||
@@ -198,32 +201,27 @@ remotePort = {{ .%s }}
|
||||
})
|
||||
|
||||
ginkgo.It("should fail with non-existent token file", func() {
|
||||
// This test verifies that server fails to start when tokenSource points to non-existent file
|
||||
// We'll verify this by checking that the configuration loading itself fails
|
||||
|
||||
// Create a config that references a non-existent file
|
||||
tmpDir := f.TempDirectory
|
||||
nonExistentFile := filepath.Join(tmpDir, "non_existent_token")
|
||||
|
||||
serverConf := consts.DefaultServerConfig
|
||||
|
||||
// Server config with non-existent tokenSource file
|
||||
serverConf += fmt.Sprintf(`
|
||||
serverPort := f.AllocPort()
|
||||
serverConf := fmt.Sprintf(`
|
||||
bindAddr = "0.0.0.0"
|
||||
bindPort = %d
|
||||
auth.tokenSource.type = "file"
|
||||
auth.tokenSource.file.path = "%s"
|
||||
`, nonExistentFile)
|
||||
`, serverPort, nonExistentFile)
|
||||
|
||||
// The test expectation is that this will fail during the RunProcesses call
|
||||
// because the server cannot load the configuration due to missing token file
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Expected: server should fail to start due to missing file
|
||||
ginkgo.By(fmt.Sprintf("Server correctly failed to start: %v", r))
|
||||
}
|
||||
}()
|
||||
serverConfigPath := f.GenerateConfigFile(serverConf)
|
||||
|
||||
// This should cause a panic or error during server startup
|
||||
f.RunProcesses([]string{serverConf}, []string{})
|
||||
_, _, _ = f.RunFrps("-c", serverConfigPath)
|
||||
|
||||
// Server should have failed to start, so the port should not be listening.
|
||||
conn, err := net.DialTimeout("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(serverPort)), 1*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
}
|
||||
framework.ExpectTrue(err != nil, "server should not be listening on port %d", serverPort)
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ var _ = ginkgo.Describe("[Feature: XTCP]", func() {
|
||||
fallbackTimeoutMs = 200
|
||||
`, framework.TCPEchoServerPort, bindPortName)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
framework.NewRequestExpect(f).
|
||||
RequestModify(func(r *request.Request) {
|
||||
r.Timeout(time.Second)
|
||||
|
||||
@@ -36,7 +36,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
|
||||
transport.bandwidthLimit = "10KB"
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
content := strings.Repeat("a", 50*1024) // 5KB
|
||||
start := time.Now()
|
||||
@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Bandwidth Limit]", func() {
|
||||
remotePort = %d
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
content := strings.Repeat("a", 50*1024) // 5KB
|
||||
start := time.Now()
|
||||
|
||||
@@ -92,7 +92,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
loadBalancer.groupKey = "123"
|
||||
`, fooPort, remotePort, barPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
fooCount := 0
|
||||
barCount := 0
|
||||
@@ -157,7 +157,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
loadBalancer.groupKey = "123"
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
fooCount := 0
|
||||
barCount := 0
|
||||
@@ -186,6 +186,68 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
|
||||
framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
|
||||
})
|
||||
|
||||
ginkgo.It("TCPMux httpconnect", func() {
|
||||
vhostPort := f.AllocPort()
|
||||
serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
|
||||
tcpmuxHTTPConnectPort = %d
|
||||
`, vhostPort)
|
||||
clientConf := consts.DefaultClientConfig
|
||||
|
||||
fooPort := f.AllocPort()
|
||||
fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
|
||||
f.RunServer("", fooServer)
|
||||
|
||||
barPort := f.AllocPort()
|
||||
barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
|
||||
f.RunServer("", barServer)
|
||||
|
||||
clientConf += fmt.Sprintf(`
|
||||
[[proxies]]
|
||||
name = "foo"
|
||||
type = "tcpmux"
|
||||
multiplexer = "httpconnect"
|
||||
localPort = %d
|
||||
customDomains = ["tcpmux-group.example.com"]
|
||||
loadBalancer.group = "test"
|
||||
loadBalancer.groupKey = "123"
|
||||
|
||||
[[proxies]]
|
||||
name = "bar"
|
||||
type = "tcpmux"
|
||||
multiplexer = "httpconnect"
|
||||
localPort = %d
|
||||
customDomains = ["tcpmux-group.example.com"]
|
||||
loadBalancer.group = "test"
|
||||
loadBalancer.groupKey = "123"
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
proxyURL := fmt.Sprintf("http://127.0.0.1:%d", vhostPort)
|
||||
fooCount := 0
|
||||
barCount := 0
|
||||
for i := range 10 {
|
||||
framework.NewRequestExpect(f).
|
||||
Explain("times " + strconv.Itoa(i)).
|
||||
RequestModify(func(r *request.Request) {
|
||||
r.Addr("tcpmux-group.example.com").Proxy(proxyURL)
|
||||
}).
|
||||
Ensure(func(resp *request.Response) bool {
|
||||
switch string(resp.Content) {
|
||||
case "foo":
|
||||
fooCount++
|
||||
case "bar":
|
||||
barCount++
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.Describe("Health Check", func() {
|
||||
@@ -224,7 +286,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
healthCheck.intervalSeconds = 1
|
||||
`, fooPort, remotePort, barPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// check foo and bar is ok
|
||||
results := []string{}
|
||||
@@ -295,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
|
||||
healthCheck.path = "/healthz"
|
||||
`, fooPort, barPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// send first HTTP request
|
||||
var contents []string
|
||||
|
||||
@@ -37,7 +37,7 @@ var _ = ginkgo.Describe("[Feature: Heartbeat]", func() {
|
||||
`, serverPort, f.PortByName(framework.TCPEchoServerPort), remotePort)
|
||||
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Protocol("tcp").Port(remotePort).Ensure()
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ var _ = ginkgo.Describe("[Feature: Monitor]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
@@ -48,7 +48,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -82,7 +82,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
customDomains = ["normal.example.com"]
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).
|
||||
RequestModify(func(r *request.Request) {
|
||||
@@ -112,7 +112,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
localAddr = "127.0.0.1:%d"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -154,7 +154,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
localAddr = "127.0.0.1:%d"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
@@ -212,7 +212,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
transport.proxyProtocolVersion = "v2"
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure(func(resp *request.Response) bool {
|
||||
log.Tracef("proxy protocol get SourceAddr: %s", string(resp.Content))
|
||||
@@ -262,7 +262,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
transport.proxyProtocolVersion = "v2"
|
||||
`, localPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Protocol("udp").Port(remotePort).Ensure(func(resp *request.Response) bool {
|
||||
log.Tracef("udp proxy protocol get SourceAddr: %s", string(resp.Content))
|
||||
@@ -309,7 +309,7 @@ var _ = ginkgo.Describe("[Feature: Real IP]", func() {
|
||||
transport.proxyProtocolVersion = "v2"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(vhostHTTPPort).RequestModify(func(r *request.Request) {
|
||||
r.HTTP().HTTPHost("normal.example.com")
|
||||
|
||||
@@ -3,6 +3,8 @@ package features
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
@@ -25,7 +27,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
|
||||
sshTunnelGateway.bindPort = %d
|
||||
`, sshPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, nil)
|
||||
f.RunProcesses(serverConf, nil)
|
||||
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
|
||||
|
||||
localPort := f.PortByName(framework.TCPEchoServerPort)
|
||||
remotePort := f.AllocPort()
|
||||
@@ -49,7 +52,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
|
||||
sshTunnelGateway.bindPort = %d
|
||||
`, vhostPort, sshPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, nil)
|
||||
f.RunProcesses(serverConf, nil)
|
||||
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
|
||||
|
||||
localPort := f.PortByName(framework.HTTPSimpleServerPort)
|
||||
tc := ssh.NewTunnelClient(
|
||||
@@ -76,7 +80,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
|
||||
sshTunnelGateway.bindPort = %d
|
||||
`, vhostPort, sshPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, nil)
|
||||
f.RunProcesses(serverConf, nil)
|
||||
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
|
||||
|
||||
localPort := f.AllocPort()
|
||||
testDomain := "test.example.com"
|
||||
@@ -118,7 +123,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
|
||||
sshTunnelGateway.bindPort = %d
|
||||
`, tcpmuxPort, sshPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, nil)
|
||||
f.RunProcesses(serverConf, nil)
|
||||
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
|
||||
|
||||
localPort := f.AllocPort()
|
||||
testDomain := "test.example.com"
|
||||
@@ -173,7 +179,8 @@ var _ = ginkgo.Describe("[Feature: SSH Tunnel]", func() {
|
||||
bindPort = %d
|
||||
`, bindPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{visitorConf})
|
||||
f.RunProcesses(serverConf, []string{visitorConf})
|
||||
framework.ExpectNoError(framework.WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(sshPort)), 5*time.Second))
|
||||
|
||||
localPort := f.PortByName(framework.TCPEchoServerPort)
|
||||
tc := ssh.NewTunnelClient(
|
||||
|
||||
@@ -30,7 +30,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
proxyConfig := map[string]any{
|
||||
@@ -71,7 +71,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
proxyConfig := map[string]any{
|
||||
@@ -125,7 +125,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
proxyConfig := map[string]any{
|
||||
@@ -173,7 +173,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
proxyConfig := map[string]any{
|
||||
@@ -225,7 +225,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
webServer.port = %d
|
||||
`, adminPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
framework.NewRequestExpect(f).RequestModify(func(r *request.Request) {
|
||||
@@ -247,7 +247,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
invalidBody, _ := json.Marshal(map[string]any{
|
||||
@@ -280,7 +280,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() {
|
||||
path = "%s/store.json"
|
||||
`, adminPort, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
createBody, _ := json.Marshal(map[string]any{
|
||||
|
||||
@@ -74,7 +74,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
clientConf.WriteString(getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n")
|
||||
}
|
||||
// run frps and frpc
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf.String()})
|
||||
f.RunProcesses(serverConf, []string{clientConf.String()})
|
||||
|
||||
for _, test := range tests {
|
||||
framework.NewRequestExpect(f).Port(f.PortByName(test.portName)).Ensure()
|
||||
@@ -98,7 +98,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
httpPassword = "123"
|
||||
`, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// http proxy, no auth info
|
||||
framework.NewRequestExpect(f).PortName(framework.HTTPSimpleServerPort).RequestModify(func(r *request.Request) {
|
||||
@@ -132,7 +132,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
password = "123"
|
||||
`, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// http proxy, no auth info
|
||||
framework.NewRequestExpect(f).PortName(framework.TCPEchoServerPort).RequestModify(func(r *request.Request) {
|
||||
@@ -182,7 +182,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
httpPassword = "123"
|
||||
`, remotePort, f.TempDirectory, f.TempDirectory, f.TempDirectory)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
// from tcp proxy
|
||||
framework.NewRequestExpect(f).Request(
|
||||
@@ -218,7 +218,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
localAddr = "127.0.0.1:%d"
|
||||
`, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -264,7 +264,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
keyPath = "%s"
|
||||
`, localPort, crtPath, keyPath)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
@@ -310,7 +310,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
keyPath = "%s"
|
||||
`, localPort, crtPath, keyPath)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
tlsConfig, err := transport.NewServerTLSConfig("", "", "")
|
||||
framework.ExpectNoError(err)
|
||||
@@ -350,7 +350,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
hostHeaderRewrite = "rewrite.test.com"
|
||||
`, remotePort, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
@@ -385,7 +385,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
requestHeaders.set.x-from-where = "frp"
|
||||
`, remotePort, localPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
@@ -431,7 +431,7 @@ var _ = ginkgo.Describe("[Feature: Client-Plugins]", func() {
|
||||
keyPath = "%s"
|
||||
`, localPort, crtPath, keyPath)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
localServer := httpserver.New(
|
||||
httpserver.WithBindPort(localPort),
|
||||
|
||||
@@ -74,7 +74,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort2)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf, invalidTokenClientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf, invalidTokenClientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
framework.NewRequestExpect(f).Port(remotePort2).ExpectError(true).Ensure()
|
||||
@@ -124,7 +124,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
@@ -160,7 +160,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = 0
|
||||
`, framework.TCPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
})
|
||||
@@ -204,7 +204,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
_, clients := f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
_, clients := f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -261,7 +261,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -310,7 +310,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -357,7 +357,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
@@ -406,7 +406,7 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
|
||||
remotePort = %d
|
||||
`, framework.TCPEchoServerPort, remotePort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
f.RunProcesses(serverConf, []string{clientConf})
|
||||
|
||||
framework.NewRequestExpect(f).Port(remotePort).Ensure()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user