mirror of
https://github.com/fatedier/frp.git
synced 2026-03-12 12:59:24 +08:00
* test/e2e: replace sleeps with event-driven waits in chaos/group/store tests Replace 21 time.Sleep calls with deterministic waiting using WaitForOutput, WaitForTCPReady, and a new WaitForTCPUnreachable helper. Add CountOutput method for snapshot-based incremental log matching. * test/e2e: validate interval and cap dial/sleep to remaining deadline in WaitForTCPUnreachable
192 lines
5.9 KiB
Go
192 lines
5.9 KiB
Go
package framework
|
|
|
|
import (
|
|
"fmt"
|
|
"maps"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/fatedier/frp/pkg/config"
|
|
flog "github.com/fatedier/frp/pkg/util/log"
|
|
"github.com/fatedier/frp/test/e2e/framework/consts"
|
|
"github.com/fatedier/frp/test/e2e/pkg/process"
|
|
)
|
|
|
|
// RunProcesses starts one frps and zero or more frpc processes from templates.
|
|
func (f *Framework) RunProcesses(serverTemplate string, clientTemplates []string) (*process.Process, []*process.Process) {
|
|
templates := append([]string{serverTemplate}, clientTemplates...)
|
|
outs, ports, err := f.RenderTemplates(templates)
|
|
ExpectNoError(err)
|
|
|
|
maps.Copy(f.usedPorts, ports)
|
|
|
|
// Start frps.
|
|
serverPath := filepath.Join(f.TempDirectory, "frp-e2e-server-0")
|
|
err = os.WriteFile(serverPath, []byte(outs[0]), 0o600)
|
|
ExpectNoError(err)
|
|
|
|
if TestContext.Debug {
|
|
flog.Debugf("[%s] %s", serverPath, outs[0])
|
|
}
|
|
|
|
serverProcess := process.NewWithEnvs(TestContext.FRPServerPath, []string{"-c", serverPath}, f.osEnvs)
|
|
f.serverConfPaths = append(f.serverConfPaths, serverPath)
|
|
f.serverProcesses = append(f.serverProcesses, serverProcess)
|
|
err = serverProcess.Start()
|
|
ExpectNoError(err)
|
|
|
|
if port, ok := ports[consts.PortServerName]; ok {
|
|
ExpectNoError(WaitForTCPReady(net.JoinHostPort("127.0.0.1", strconv.Itoa(port)), 5*time.Second))
|
|
} else {
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
|
|
// Start frpc(s).
|
|
clientProcesses := make([]*process.Process, 0, len(clientTemplates))
|
|
for i := range clientTemplates {
|
|
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-client-%d", i))
|
|
err = os.WriteFile(path, []byte(outs[1+i]), 0o600)
|
|
ExpectNoError(err)
|
|
|
|
if TestContext.Debug {
|
|
flog.Debugf("[%s] %s", path, outs[1+i])
|
|
}
|
|
|
|
p := process.NewWithEnvs(TestContext.FRPClientPath, []string{"-c", path}, f.osEnvs)
|
|
f.clientConfPaths = append(f.clientConfPaths, path)
|
|
f.clientProcesses = append(f.clientProcesses, p)
|
|
clientProcesses = append(clientProcesses, p)
|
|
err = p.Start()
|
|
ExpectNoError(err)
|
|
}
|
|
// Wait for each client's proxies to register with frps.
|
|
// If any client has no proxies (e.g. visitor-only), fall back to sleep
|
|
// for the remaining time since visitors have no deterministic readiness signal.
|
|
allConfirmed := len(clientProcesses) > 0
|
|
start := time.Now()
|
|
for i, p := range clientProcesses {
|
|
configPath := f.clientConfPaths[len(f.clientConfPaths)-len(clientProcesses)+i]
|
|
if !waitForClientProxyReady(configPath, p, 5*time.Second) {
|
|
allConfirmed = false
|
|
}
|
|
}
|
|
if len(clientProcesses) > 0 && !allConfirmed {
|
|
remaining := 1500*time.Millisecond - time.Since(start)
|
|
if remaining > 0 {
|
|
time.Sleep(remaining)
|
|
}
|
|
}
|
|
|
|
return serverProcess, clientProcesses
|
|
}
|
|
|
|
func (f *Framework) RunFrps(args ...string) (*process.Process, string, error) {
|
|
p := process.NewWithEnvs(TestContext.FRPServerPath, args, f.osEnvs)
|
|
f.serverProcesses = append(f.serverProcesses, p)
|
|
err := p.Start()
|
|
if err != nil {
|
|
return p, p.Output(), err
|
|
}
|
|
select {
|
|
case <-p.Done():
|
|
case <-time.After(2 * time.Second):
|
|
}
|
|
return p, p.Output(), nil
|
|
}
|
|
|
|
func (f *Framework) RunFrpc(args ...string) (*process.Process, string, error) {
|
|
p := process.NewWithEnvs(TestContext.FRPClientPath, args, f.osEnvs)
|
|
f.clientProcesses = append(f.clientProcesses, p)
|
|
err := p.Start()
|
|
if err != nil {
|
|
return p, p.Output(), err
|
|
}
|
|
select {
|
|
case <-p.Done():
|
|
case <-time.After(1500 * time.Millisecond):
|
|
}
|
|
return p, p.Output(), nil
|
|
}
|
|
|
|
func (f *Framework) GenerateConfigFile(content string) string {
|
|
f.configFileIndex++
|
|
path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-config-%d", f.configFileIndex))
|
|
err := os.WriteFile(path, []byte(content), 0o600)
|
|
ExpectNoError(err)
|
|
return path
|
|
}
|
|
|
|
// waitForClientProxyReady parses the client config to extract proxy names,
|
|
// then waits for each proxy's "start proxy success" log in the process output.
|
|
// Returns true only if proxies were expected and all registered successfully.
|
|
func waitForClientProxyReady(configPath string, p *process.Process, timeout time.Duration) bool {
|
|
_, proxyCfgs, _, _, err := config.LoadClientConfig(configPath, false)
|
|
if err != nil || len(proxyCfgs) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Use a single deadline so the total wait across all proxies does not exceed timeout.
|
|
deadline := time.Now().Add(timeout)
|
|
for _, cfg := range proxyCfgs {
|
|
remaining := time.Until(deadline)
|
|
if remaining <= 0 {
|
|
return false
|
|
}
|
|
name := cfg.GetBaseConfig().Name
|
|
pattern := fmt.Sprintf("[%s] start proxy success", name)
|
|
if err := p.WaitForOutput(pattern, 1, remaining); err != nil {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// WaitForTCPUnreachable polls a TCP address until a connection fails or timeout.
|
|
func WaitForTCPUnreachable(addr string, interval, timeout time.Duration) error {
|
|
if interval <= 0 {
|
|
return fmt.Errorf("invalid interval for TCP unreachable on %s: interval must be positive", addr)
|
|
}
|
|
if timeout <= 0 {
|
|
return fmt.Errorf("invalid timeout for TCP unreachable on %s: timeout must be positive", addr)
|
|
}
|
|
deadline := time.Now().Add(timeout)
|
|
for {
|
|
remaining := time.Until(deadline)
|
|
if remaining <= 0 {
|
|
return fmt.Errorf("timeout waiting for TCP unreachable on %s", addr)
|
|
}
|
|
dialTimeout := min(interval, remaining)
|
|
conn, err := net.DialTimeout("tcp", addr, dialTimeout)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
conn.Close()
|
|
time.Sleep(min(interval, time.Until(deadline)))
|
|
}
|
|
}
|
|
|
|
// WaitForTCPReady polls a TCP address until a connection succeeds or timeout.
|
|
func WaitForTCPReady(addr string, timeout time.Duration) error {
|
|
if timeout <= 0 {
|
|
return fmt.Errorf("invalid timeout for TCP readiness on %s: timeout must be positive", addr)
|
|
}
|
|
deadline := time.Now().Add(timeout)
|
|
var lastErr error
|
|
for time.Now().Before(deadline) {
|
|
conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond)
|
|
if err == nil {
|
|
conn.Close()
|
|
return nil
|
|
}
|
|
lastErr = err
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
if lastErr == nil {
|
|
return fmt.Errorf("timeout waiting for TCP readiness on %s before any dial attempt", addr)
|
|
}
|
|
return fmt.Errorf("timeout waiting for TCP readiness on %s: %w", addr, lastErr)
|
|
}
|