mirror of
https://github.com/fatedier/frp.git
synced 2026-03-10 20:09:10 +08:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01413c3853 | ||
|
|
adcd2e64b6 |
@@ -9,6 +9,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fatedier/frp/pkg/config"
|
||||||
flog "github.com/fatedier/frp/pkg/util/log"
|
flog "github.com/fatedier/frp/pkg/util/log"
|
||||||
"github.com/fatedier/frp/test/e2e/framework/consts"
|
"github.com/fatedier/frp/test/e2e/framework/consts"
|
||||||
"github.com/fatedier/frp/test/e2e/pkg/process"
|
"github.com/fatedier/frp/test/e2e/pkg/process"
|
||||||
@@ -61,9 +62,22 @@ func (f *Framework) RunProcesses(serverTemplate string, clientTemplates []string
|
|||||||
err = p.Start()
|
err = p.Start()
|
||||||
ExpectNoError(err)
|
ExpectNoError(err)
|
||||||
}
|
}
|
||||||
// frpc needs time to connect and register proxies with frps.
|
// Wait for each client's proxies to register with frps.
|
||||||
if len(clientProcesses) > 0 {
|
// If any client has no proxies (e.g. visitor-only), fall back to sleep
|
||||||
time.Sleep(1500 * time.Millisecond)
|
// for the remaining time since visitors have no deterministic readiness signal.
|
||||||
|
allConfirmed := len(clientProcesses) > 0
|
||||||
|
start := time.Now()
|
||||||
|
for i, p := range clientProcesses {
|
||||||
|
configPath := f.clientConfPaths[len(f.clientConfPaths)-len(clientProcesses)+i]
|
||||||
|
if !waitForClientProxyReady(configPath, p, 5*time.Second) {
|
||||||
|
allConfirmed = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(clientProcesses) > 0 && !allConfirmed {
|
||||||
|
remaining := 1500*time.Millisecond - time.Since(start)
|
||||||
|
if remaining > 0 {
|
||||||
|
time.Sleep(remaining)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return serverProcess, clientProcesses
|
return serverProcess, clientProcesses
|
||||||
@@ -105,6 +119,31 @@ func (f *Framework) GenerateConfigFile(content string) string {
|
|||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForClientProxyReady parses the client config to extract proxy names,
|
||||||
|
// then waits for each proxy's "start proxy success" log in the process output.
|
||||||
|
// Returns true only if proxies were expected and all registered successfully.
|
||||||
|
func waitForClientProxyReady(configPath string, p *process.Process, timeout time.Duration) bool {
|
||||||
|
_, proxyCfgs, _, _, err := config.LoadClientConfig(configPath, false)
|
||||||
|
if err != nil || len(proxyCfgs) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a single deadline so the total wait across all proxies does not exceed timeout.
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
for _, cfg := range proxyCfgs {
|
||||||
|
remaining := time.Until(deadline)
|
||||||
|
if remaining <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
name := cfg.GetBaseConfig().Name
|
||||||
|
pattern := fmt.Sprintf("[%s] start proxy success", name)
|
||||||
|
if err := p.WaitForOutput(pattern, 1, remaining); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
|
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
|
||||||
func WaitForTCPReady(addr string, timeout time.Duration) error {
|
func WaitForTCPReady(addr string, timeout time.Duration) error {
|
||||||
if timeout <= 0 {
|
if timeout <= 0 {
|
||||||
|
|||||||
@@ -4,15 +4,37 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// SafeBuffer is a thread-safe wrapper around bytes.Buffer.
|
||||||
|
// It is safe to call Write and String concurrently.
|
||||||
|
type SafeBuffer struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *SafeBuffer) Write(p []byte) (int, error) {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
return b.buf.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *SafeBuffer) String() string {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
return b.buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
type Process struct {
|
type Process struct {
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
errorOutput *bytes.Buffer
|
errorOutput *SafeBuffer
|
||||||
stdOutput *bytes.Buffer
|
stdOutput *SafeBuffer
|
||||||
|
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
closeOne sync.Once
|
closeOne sync.Once
|
||||||
@@ -36,8 +58,8 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
|
|||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
p.errorOutput = bytes.NewBufferString("")
|
p.errorOutput = &SafeBuffer{}
|
||||||
p.stdOutput = bytes.NewBufferString("")
|
p.stdOutput = &SafeBuffer{}
|
||||||
cmd.Stderr = p.errorOutput
|
cmd.Stderr = p.errorOutput
|
||||||
cmd.Stdout = p.stdOutput
|
cmd.Stdout = p.stdOutput
|
||||||
return p
|
return p
|
||||||
@@ -101,3 +123,26 @@ func (p *Process) Output() string {
|
|||||||
func (p *Process) SetBeforeStopHandler(fn func()) {
|
func (p *Process) SetBeforeStopHandler(fn func()) {
|
||||||
p.beforeStopHandler = fn
|
p.beforeStopHandler = fn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForOutput polls the combined process output until the pattern is found
|
||||||
|
// count time(s) or the timeout is reached. It also returns early if the process exits.
|
||||||
|
func (p *Process) WaitForOutput(pattern string, count int, timeout time.Duration) error {
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
output := p.Output()
|
||||||
|
if strings.Count(output, pattern) >= count {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-p.Done():
|
||||||
|
// Process exited, check one last time.
|
||||||
|
output = p.Output()
|
||||||
|
if strings.Count(output, pattern) >= count {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("process exited before %d occurrence(s) of %q found", count, pattern)
|
||||||
|
case <-time.After(25 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("timeout waiting for %d occurrence(s) of %q", count, pattern)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user