From 4f584f81d0af1a3cc5e5061c59d62a3ef136f7fe Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 12 Mar 2026 00:11:09 +0800 Subject: [PATCH] test/e2e: replace sleeps with event-driven waits in chaos/group/store tests (#5231) * test/e2e: replace sleeps with event-driven waits in chaos/group/store tests Replace 21 time.Sleep calls with deterministic waiting using WaitForOutput, WaitForTCPReady, and a new WaitForTCPUnreachable helper. Add CountOutput method for snapshot-based incremental log matching. * test/e2e: validate interval and cap dial/sleep to remaining deadline in WaitForTCPUnreachable --- test/e2e/framework/process.go | 24 ++++++++++++++++++++++++ test/e2e/pkg/process/process.go | 5 +++++ test/e2e/v1/features/chaos.go | 10 +++++----- test/e2e/v1/features/group.go | 16 ++++++++++------ test/e2e/v1/features/store.go | 27 +++++++++++++-------------- 5 files changed, 57 insertions(+), 25 deletions(-) diff --git a/test/e2e/framework/process.go b/test/e2e/framework/process.go index cca93b1d..f5faa637 100644 --- a/test/e2e/framework/process.go +++ b/test/e2e/framework/process.go @@ -144,6 +144,30 @@ func waitForClientProxyReady(configPath string, p *process.Process, timeout time return true } +// WaitForTCPUnreachable polls a TCP address until a connection fails or timeout. +func WaitForTCPUnreachable(addr string, interval, timeout time.Duration) error { + if interval <= 0 { + return fmt.Errorf("invalid interval for TCP unreachable on %s: interval must be positive", addr) + } + if timeout <= 0 { + return fmt.Errorf("invalid timeout for TCP unreachable on %s: timeout must be positive", addr) + } + deadline := time.Now().Add(timeout) + for { + remaining := time.Until(deadline) + if remaining <= 0 { + return fmt.Errorf("timeout waiting for TCP unreachable on %s", addr) + } + dialTimeout := min(interval, remaining) + conn, err := net.DialTimeout("tcp", addr, dialTimeout) + if err != nil { + return nil + } + conn.Close() + time.Sleep(min(interval, time.Until(deadline))) + } +} + // WaitForTCPReady polls a TCP address until a connection succeeds or timeout. func WaitForTCPReady(addr string, timeout time.Duration) error { if timeout <= 0 { diff --git a/test/e2e/pkg/process/process.go b/test/e2e/pkg/process/process.go index 6f4e3e39..8235461b 100644 --- a/test/e2e/pkg/process/process.go +++ b/test/e2e/pkg/process/process.go @@ -120,6 +120,11 @@ func (p *Process) Output() string { return p.stdOutput.String() + p.errorOutput.String() } +// CountOutput returns how many times pattern appears in the current accumulated output. +func (p *Process) CountOutput(pattern string) int { + return strings.Count(p.Output(), pattern) +} + func (p *Process) SetBeforeStopHandler(fn func()) { p.beforeStopHandler = fn } diff --git a/test/e2e/v1/features/chaos.go b/test/e2e/v1/features/chaos.go index f5f8b388..8c515251 100644 --- a/test/e2e/v1/features/chaos.go +++ b/test/e2e/v1/features/chaos.go @@ -41,24 +41,24 @@ var _ = ginkgo.Describe("[Feature: Chaos]", func() { // 2. stop frps, expect request failed _ = ps.Stop() - time.Sleep(200 * time.Millisecond) framework.NewRequestExpect(f).Port(remotePort).ExpectError(true).Ensure() // 3. restart frps, expect request success + successCount := pc.CountOutput("[tcp] start proxy success") _, _, err = f.RunFrps("-c", serverConfigPath) framework.ExpectNoError(err) - time.Sleep(2 * time.Second) + framework.ExpectNoError(pc.WaitForOutput("[tcp] start proxy success", successCount+1, 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).Ensure() // 4. stop frpc, expect request failed _ = pc.Stop() - time.Sleep(200 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPUnreachable(fmt.Sprintf("127.0.0.1:%d", remotePort), 100*time.Millisecond, 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).ExpectError(true).Ensure() // 5. restart frpc, expect request success - _, _, err = f.RunFrpc("-c", clientConfigPath) + newPc, _, err := f.RunFrpc("-c", clientConfigPath) framework.ExpectNoError(err) - time.Sleep(time.Second) + framework.ExpectNoError(newPc.WaitForOutput("[tcp] start proxy success", 1, 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).Ensure() }) }) diff --git a/test/e2e/v1/features/group.go b/test/e2e/v1/features/group.go index 610903d0..85b938a9 100644 --- a/test/e2e/v1/features/group.go +++ b/test/e2e/v1/features/group.go @@ -286,7 +286,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() { healthCheck.intervalSeconds = 1 `, fooPort, remotePort, barPort, remotePort) - f.RunProcesses(serverConf, []string{clientConf}) + _, clientProcesses := f.RunProcesses(serverConf, []string{clientConf}) // check foo and bar is ok results := []string{} @@ -299,15 +299,17 @@ var _ = ginkgo.Describe("[Feature: Group]", func() { framework.ExpectContainElements(results, []string{"foo", "bar"}) // close bar server, check foo is ok + failedCount := clientProcesses[0].CountOutput("[bar] health check failed") barServer.Close() - time.Sleep(2 * time.Second) + framework.ExpectNoError(clientProcesses[0].WaitForOutput("[bar] health check failed", failedCount+1, 5*time.Second)) for range 10 { framework.NewRequestExpect(f).Port(remotePort).ExpectResp([]byte("foo")).Ensure() } // resume bar server, check foo and bar is ok + successCount := clientProcesses[0].CountOutput("[bar] health check success") f.RunServer("", barServer) - time.Sleep(2 * time.Second) + framework.ExpectNoError(clientProcesses[0].WaitForOutput("[bar] health check success", successCount+1, 5*time.Second)) results = []string{} for range 10 { framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool { @@ -357,7 +359,7 @@ var _ = ginkgo.Describe("[Feature: Group]", func() { healthCheck.path = "/healthz" `, fooPort, barPort) - f.RunProcesses(serverConf, []string{clientConf}) + _, clientProcesses := f.RunProcesses(serverConf, []string{clientConf}) // send first HTTP request var contents []string @@ -387,15 +389,17 @@ var _ = ginkgo.Describe("[Feature: Group]", func() { framework.ExpectContainElements(results, []string{"foo", "bar"}) // close bar server, check foo is ok + failedCount := clientProcesses[0].CountOutput("[bar] health check failed") barServer.Close() - time.Sleep(2 * time.Second) + framework.ExpectNoError(clientProcesses[0].WaitForOutput("[bar] health check failed", failedCount+1, 5*time.Second)) results = doFooBarHTTPRequest(vhostPort, "example.com") framework.ExpectContainElements(results, []string{"foo"}) framework.ExpectNotContainElements(results, []string{"bar"}) // resume bar server, check foo and bar is ok + successCount := clientProcesses[0].CountOutput("[bar] health check success") f.RunServer("", barServer) - time.Sleep(2 * time.Second) + framework.ExpectNoError(clientProcesses[0].WaitForOutput("[bar] health check success", successCount+1, 5*time.Second)) results = doFooBarHTTPRequest(vhostPort, "example.com") framework.ExpectContainElements(results, []string{"foo", "bar"}) }) diff --git a/test/e2e/v1/features/store.go b/test/e2e/v1/features/store.go index 3fc6bb14..76a9b05a 100644 --- a/test/e2e/v1/features/store.go +++ b/test/e2e/v1/features/store.go @@ -31,7 +31,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) proxyConfig := map[string]any{ "name": "test-tcp", @@ -52,7 +52,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(time.Second) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", remotePort), 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).Ensure() }) @@ -72,7 +72,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) proxyConfig := map[string]any{ "name": "test-tcp", @@ -93,7 +93,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(time.Second) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", remotePort1), 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort1).Ensure() proxyConfig["tcp"].(map[string]any)["remotePort"] = remotePort2 @@ -107,7 +107,8 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(time.Second) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", remotePort2), 5*time.Second)) + framework.ExpectNoError(framework.WaitForTCPUnreachable(fmt.Sprintf("127.0.0.1:%d", remotePort1), 100*time.Millisecond, 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort2).Ensure() framework.NewRequestExpect(f).Port(remotePort1).ExpectError(true).Ensure() }) @@ -126,7 +127,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) proxyConfig := map[string]any{ "name": "test-tcp", @@ -147,7 +148,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(time.Second) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", remotePort), 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).Ensure() framework.NewRequestExpect(f).RequestModify(func(r *request.Request) { @@ -156,7 +157,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(time.Second) + framework.ExpectNoError(framework.WaitForTCPUnreachable(fmt.Sprintf("127.0.0.1:%d", remotePort), 100*time.Millisecond, 5*time.Second)) framework.NewRequestExpect(f).Port(remotePort).ExpectError(true).Ensure() }) @@ -174,7 +175,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) proxyConfig := map[string]any{ "name": "test-tcp", @@ -195,8 +196,6 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { return resp.Code == 200 }) - time.Sleep(500 * time.Millisecond) - framework.NewRequestExpect(f).RequestModify(func(r *request.Request) { r.HTTP().Port(adminPort).HTTPPath("/api/store/proxies") }).Ensure(func(resp *request.Response) bool { @@ -226,7 +225,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) framework.NewRequestExpect(f).RequestModify(func(r *request.Request) { r.HTTP().Port(adminPort).HTTPPath("/api/store/proxies") @@ -248,7 +247,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) invalidBody, _ := json.Marshal(map[string]any{ "name": "bad-proxy", @@ -281,7 +280,7 @@ var _ = ginkgo.Describe("[Feature: Store]", func() { `, adminPort, f.TempDirectory) f.RunProcesses(serverConf, []string{clientConf}) - time.Sleep(500 * time.Millisecond) + framework.ExpectNoError(framework.WaitForTCPReady(fmt.Sprintf("127.0.0.1:%d", adminPort), 5*time.Second)) createBody, _ := json.Marshal(map[string]any{ "name": "proxy-a",