Compare commits

..

2 Commits
dev ... new

Author SHA1 Message Date
fatedier
01413c3853 test/e2e: use shared deadline for proxy readiness and fix doc comment
- Use a single deadline in waitForClientProxyReady so total wait across
  all proxies does not exceed the given timeout
- Fix WaitForOutput doc comment to accurately describe single pattern
  with count semantics
2026-03-09 18:45:12 +08:00
fatedier
adcd2e64b6 test/e2e: replace RunProcesses client sleep with log-based proxy readiness detection
Replace the fixed 1500ms sleep in RunProcesses with event-driven proxy
registration detection by monitoring frpc log output for "start proxy
success" messages.

Key changes:
- Add thread-safe SafeBuffer to replace bytes.Buffer in Process, enabling
  concurrent read/write of process output during execution
- Add Process.WaitForOutput() to poll process output for pattern matches
  with timeout and early exit on process termination
- Add waitForClientProxyReady() that uses config.LoadClientConfig() to
  extract proxy names, then waits for each proxy's success log
- For visitor-only clients (no deterministic readiness signal), fall back
  to the original sleep with elapsed time deducted
2026-03-09 13:46:13 +08:00
2 changed files with 91 additions and 7 deletions

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"time"
"github.com/fatedier/frp/pkg/config"
flog "github.com/fatedier/frp/pkg/util/log"
"github.com/fatedier/frp/test/e2e/framework/consts"
"github.com/fatedier/frp/test/e2e/pkg/process"
@@ -61,9 +62,22 @@ func (f *Framework) RunProcesses(serverTemplate string, clientTemplates []string
err = p.Start()
ExpectNoError(err)
}
// frpc needs time to connect and register proxies with frps.
if len(clientProcesses) > 0 {
time.Sleep(1500 * time.Millisecond)
// Wait for each client's proxies to register with frps.
// If any client has no proxies (e.g. visitor-only), fall back to sleep
// for the remaining time since visitors have no deterministic readiness signal.
allConfirmed := len(clientProcesses) > 0
start := time.Now()
for i, p := range clientProcesses {
configPath := f.clientConfPaths[len(f.clientConfPaths)-len(clientProcesses)+i]
if !waitForClientProxyReady(configPath, p, 5*time.Second) {
allConfirmed = false
}
}
if len(clientProcesses) > 0 && !allConfirmed {
remaining := 1500*time.Millisecond - time.Since(start)
if remaining > 0 {
time.Sleep(remaining)
}
}
return serverProcess, clientProcesses
@@ -105,6 +119,31 @@ func (f *Framework) GenerateConfigFile(content string) string {
return path
}
// waitForClientProxyReady parses the client config to extract proxy names,
// then waits for each proxy's "start proxy success" log in the process output.
// Returns true only if proxies were expected and all registered successfully.
func waitForClientProxyReady(configPath string, p *process.Process, timeout time.Duration) bool {
_, proxyCfgs, _, _, err := config.LoadClientConfig(configPath, false)
if err != nil || len(proxyCfgs) == 0 {
return false
}
// Use a single deadline so the total wait across all proxies does not exceed timeout.
deadline := time.Now().Add(timeout)
for _, cfg := range proxyCfgs {
remaining := time.Until(deadline)
if remaining <= 0 {
return false
}
name := cfg.GetBaseConfig().Name
pattern := fmt.Sprintf("[%s] start proxy success", name)
if err := p.WaitForOutput(pattern, 1, remaining); err != nil {
return false
}
}
return true
}
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
func WaitForTCPReady(addr string, timeout time.Duration) error {
if timeout <= 0 {

View File

@@ -4,15 +4,37 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os/exec"
"strings"
"sync"
"time"
)
// SafeBuffer is a thread-safe wrapper around bytes.Buffer.
// It is safe to call Write and String concurrently.
type SafeBuffer struct {
mu sync.Mutex
buf bytes.Buffer
}
func (b *SafeBuffer) Write(p []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.Write(p)
}
func (b *SafeBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
type Process struct {
cmd *exec.Cmd
cancel context.CancelFunc
errorOutput *bytes.Buffer
stdOutput *bytes.Buffer
errorOutput *SafeBuffer
stdOutput *SafeBuffer
done chan struct{}
closeOne sync.Once
@@ -36,8 +58,8 @@ func NewWithEnvs(path string, params []string, envs []string) *Process {
cancel: cancel,
done: make(chan struct{}),
}
p.errorOutput = bytes.NewBufferString("")
p.stdOutput = bytes.NewBufferString("")
p.errorOutput = &SafeBuffer{}
p.stdOutput = &SafeBuffer{}
cmd.Stderr = p.errorOutput
cmd.Stdout = p.stdOutput
return p
@@ -101,3 +123,26 @@ func (p *Process) Output() string {
func (p *Process) SetBeforeStopHandler(fn func()) {
p.beforeStopHandler = fn
}
// WaitForOutput polls the combined process output until the pattern is found
// count time(s) or the timeout is reached. It also returns early if the process exits.
func (p *Process) WaitForOutput(pattern string, count int, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
output := p.Output()
if strings.Count(output, pattern) >= count {
return nil
}
select {
case <-p.Done():
// Process exited, check one last time.
output = p.Output()
if strings.Count(output, pattern) >= count {
return nil
}
return fmt.Errorf("process exited before %d occurrence(s) of %q found", count, pattern)
case <-time.After(25 * time.Millisecond):
}
}
return fmt.Errorf("timeout waiting for %d occurrence(s) of %q", count, pattern)
}