mirror of
https://github.com/fatedier/frp.git
synced 2026-03-20 16:59:18 +08:00
Compare commits
3 Commits
feb98d3cda
...
681fa87fae
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
681fa87fae | ||
|
|
e025843d3c | ||
|
|
a75320ef2f |
@@ -1,3 +1,8 @@
|
||||
## Features
|
||||
|
||||
* HTTPS proxies now support load balancing groups. Multiple HTTPS proxies can be configured with the same `loadBalancer.group` and `loadBalancer.groupKey` to share the same custom domain and distribute traffic across multiple backend services, similar to the existing TCP and HTTP load balancing capabilities.
|
||||
* Individual frpc proxies and visitors now accept an `enabled` flag (defaults to true), letting you disable specific entries without relying on the global `start` list—disabled blocks are skipped when client configs load.
|
||||
|
||||
## Improvements
|
||||
|
||||
* **VirtualNet**: Implemented intelligent reconnection with exponential backoff. When connection errors occur repeatedly, the reconnect interval increases from 60s to 300s (max), reducing unnecessary reconnection attempts. Normal disconnections still reconnect quickly at 10s intervals.
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package visitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
@@ -81,11 +82,22 @@ func (sv *STCPVisitor) internalConnWorker() {
|
||||
|
||||
func (sv *STCPVisitor) handleConn(userConn net.Conn) {
|
||||
xl := xlog.FromContextSafe(sv.ctx)
|
||||
defer userConn.Close()
|
||||
var tunnelErr error
|
||||
defer func() {
|
||||
// If there was an error and connection supports CloseWithError, use it
|
||||
if tunnelErr != nil {
|
||||
if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok {
|
||||
_ = eConn.CloseWithError(tunnelErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
userConn.Close()
|
||||
}()
|
||||
|
||||
xl.Debugf("get a new stcp user connection")
|
||||
visitorConn, err := sv.helper.ConnectServer()
|
||||
if err != nil {
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
defer visitorConn.Close()
|
||||
@@ -102,6 +114,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
|
||||
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
|
||||
if err != nil {
|
||||
xl.Warnf("send newVisitorConnMsg to server error: %v", err)
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
|
||||
@@ -110,12 +123,14 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
|
||||
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
|
||||
if err != nil {
|
||||
xl.Warnf("get newVisitorConnRespMsg error: %v", err)
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
_ = visitorConn.SetReadDeadline(time.Time{})
|
||||
|
||||
if newVisitorConnRespMsg.Error != "" {
|
||||
xl.Warnf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
|
||||
tunnelErr = fmt.Errorf("%s", newVisitorConnRespMsg.Error)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -125,6 +140,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
|
||||
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
|
||||
if err != nil {
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func NewVisitor(
|
||||
Name: cfg.GetBaseConfig().Name,
|
||||
Ctx: ctx,
|
||||
VnetController: helper.VNetController(),
|
||||
HandleConn: func(conn net.Conn) {
|
||||
SendConnToVisitor: func(conn net.Conn) {
|
||||
_ = baseVisitor.AcceptConn(conn)
|
||||
},
|
||||
},
|
||||
|
||||
@@ -162,8 +162,16 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() {
|
||||
func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
|
||||
xl := xlog.FromContextSafe(sv.ctx)
|
||||
isConnTransferred := false
|
||||
var tunnelErr error
|
||||
defer func() {
|
||||
if !isConnTransferred {
|
||||
// If there was an error and connection supports CloseWithError, use it
|
||||
if tunnelErr != nil {
|
||||
if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok {
|
||||
_ = eConn.CloseWithError(tunnelErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
userConn.Close()
|
||||
}
|
||||
}()
|
||||
@@ -181,6 +189,8 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
|
||||
tunnelConn, err := sv.openTunnel(ctx)
|
||||
if err != nil {
|
||||
xl.Errorf("open tunnel error: %v", err)
|
||||
tunnelErr = err
|
||||
|
||||
// no fallback, just return
|
||||
if sv.cfg.FallbackTo == "" {
|
||||
return
|
||||
@@ -200,6 +210,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
|
||||
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
|
||||
if err != nil {
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,6 +143,11 @@ transport.tls.enable = true
|
||||
# Default is empty, means all proxies.
|
||||
# start = ["ssh", "dns"]
|
||||
|
||||
# Alternative to 'start': You can control each proxy individually using the 'enabled' field.
|
||||
# Set 'enabled = false' in a proxy configuration to disable it.
|
||||
# If 'enabled' is not set or set to true, the proxy is enabled by default.
|
||||
# The 'enabled' field provides more granular control and is recommended over 'start'.
|
||||
|
||||
# Specify udp packet size, unit is byte. If not set, the default value is 1500.
|
||||
# This parameter should be same between client and server.
|
||||
# It affects the udp and sudp proxy.
|
||||
@@ -169,6 +174,8 @@ metadatas.var2 = "123"
|
||||
# If global user is not empty, it will be changed to {user}.{proxy} such as 'your_name.ssh'
|
||||
name = "ssh"
|
||||
type = "tcp"
|
||||
# Enable or disable this proxy. true or omit this field to enable, false to disable.
|
||||
# enabled = true
|
||||
localIP = "127.0.0.1"
|
||||
localPort = 22
|
||||
# Limit bandwidth for this proxy, unit is KB and MB
|
||||
@@ -253,6 +260,8 @@ healthCheck.httpHeaders=[
|
||||
[[proxies]]
|
||||
name = "web02"
|
||||
type = "https"
|
||||
# Disable this proxy by setting enabled to false
|
||||
# enabled = false
|
||||
localIP = "127.0.0.1"
|
||||
localPort = 8000
|
||||
subdomain = "web02"
|
||||
|
||||
@@ -281,6 +281,17 @@ func LoadClientConfig(path string, strict bool) (
|
||||
})
|
||||
}
|
||||
|
||||
// Filter by enabled field in each proxy
|
||||
// nil or true means enabled, false means disabled
|
||||
proxyCfgs = lo.Filter(proxyCfgs, func(c v1.ProxyConfigurer, _ int) bool {
|
||||
enabled := c.GetBaseConfig().Enabled
|
||||
return enabled == nil || *enabled
|
||||
})
|
||||
visitorCfgs = lo.Filter(visitorCfgs, func(c v1.VisitorConfigurer, _ int) bool {
|
||||
enabled := c.GetBaseConfig().Enabled
|
||||
return enabled == nil || *enabled
|
||||
})
|
||||
|
||||
if cliCfg != nil {
|
||||
if err := cliCfg.Complete(); err != nil {
|
||||
return nil, nil, nil, isLegacyFormat, err
|
||||
|
||||
@@ -108,8 +108,11 @@ type DomainConfig struct {
|
||||
}
|
||||
|
||||
type ProxyBaseConfig struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
// Enabled controls whether this proxy is enabled. nil or true means enabled, false means disabled.
|
||||
// This allows individual control over each proxy, complementing the global "start" field.
|
||||
Enabled *bool `json:"enabled,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Transport ProxyTransport `json:"transport,omitempty"`
|
||||
// metadata info for each proxy
|
||||
|
||||
@@ -32,8 +32,11 @@ type VisitorTransport struct {
|
||||
}
|
||||
|
||||
type VisitorBaseConfig struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
// Enabled controls whether this visitor is enabled. nil or true means enabled, false means disabled.
|
||||
// This allows individual control over each visitor, complementing the global "start" field.
|
||||
Enabled *bool `json:"enabled,omitempty"`
|
||||
Transport VisitorTransport `json:"transport,omitempty"`
|
||||
SecretKey string `json:"secretKey,omitempty"`
|
||||
// if the server user is not set, it defaults to the current user
|
||||
|
||||
@@ -23,11 +23,20 @@ import (
|
||||
"github.com/fatedier/frp/pkg/vnet"
|
||||
)
|
||||
|
||||
// PluginContext provides the necessary context and callbacks for visitor plugins.
|
||||
type PluginContext struct {
|
||||
Name string
|
||||
Ctx context.Context
|
||||
// Name is the unique identifier for this visitor, used for logging and routing.
|
||||
Name string
|
||||
|
||||
// Ctx manages the plugin's lifecycle and carries the logger for structured logging.
|
||||
Ctx context.Context
|
||||
|
||||
// VnetController manages TUN device routing. May be nil if virtual networking is disabled.
|
||||
VnetController *vnet.Controller
|
||||
HandleConn func(net.Conn)
|
||||
|
||||
// SendConnToVisitor sends a connection to the visitor's internal processing queue.
|
||||
// Does not return error; failures are handled by closing the connection.
|
||||
SendConnToVisitor func(net.Conn)
|
||||
}
|
||||
|
||||
// Creators is used for create plugins to handle connections.
|
||||
|
||||
@@ -42,6 +42,8 @@ type VirtualNetPlugin struct {
|
||||
controllerConn net.Conn
|
||||
closeSignal chan struct{}
|
||||
|
||||
consecutiveErrors int // Tracks consecutive connection errors for exponential backoff
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
@@ -98,7 +100,6 @@ func (p *VirtualNetPlugin) Start() {
|
||||
|
||||
func (p *VirtualNetPlugin) run() {
|
||||
xl := xlog.FromContextSafe(p.ctx)
|
||||
reconnectDelay := 10 * time.Second
|
||||
|
||||
for {
|
||||
currentCloseSignal := make(chan struct{})
|
||||
@@ -121,7 +122,10 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.controllerConn = controllerConn
|
||||
p.mu.Unlock()
|
||||
|
||||
pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() {
|
||||
// Wrap with CloseNotifyConn which supports both close notification and error recording
|
||||
var closeErr error
|
||||
pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func(err error) {
|
||||
closeErr = err
|
||||
close(currentCloseSignal) // Signal the run loop on close.
|
||||
})
|
||||
|
||||
@@ -129,9 +133,9 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
|
||||
xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
|
||||
|
||||
// Pass the CloseNotifyConn to HandleConn.
|
||||
// HandleConn is responsible for calling Close() on pluginNotifyConn.
|
||||
p.pluginCtx.HandleConn(pluginNotifyConn)
|
||||
// Pass the CloseNotifyConn to the visitor for handling.
|
||||
// The visitor can call CloseWithError to record the failure reason.
|
||||
p.pluginCtx.SendConnToVisitor(pluginNotifyConn)
|
||||
|
||||
// Wait for context cancellation or connection close.
|
||||
select {
|
||||
@@ -140,8 +144,32 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.cleanupControllerConn(xl)
|
||||
return
|
||||
case <-currentCloseSignal:
|
||||
xl.Infof("detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name)
|
||||
// HandleConn closed the plugin side. Close the controller side.
|
||||
// Determine reconnect delay based on error with exponential backoff
|
||||
var reconnectDelay time.Duration
|
||||
if closeErr != nil {
|
||||
p.consecutiveErrors++
|
||||
xl.Warnf("connection closed with error for visitor [%s] (consecutive errors: %d): %v",
|
||||
p.pluginCtx.Name, p.consecutiveErrors, closeErr)
|
||||
|
||||
// Exponential backoff: 60s, 120s, 240s, 300s (capped)
|
||||
baseDelay := 60 * time.Second
|
||||
reconnectDelay = baseDelay * time.Duration(1<<uint(p.consecutiveErrors-1))
|
||||
if reconnectDelay > 300*time.Second {
|
||||
reconnectDelay = 300 * time.Second
|
||||
}
|
||||
} else {
|
||||
// Reset consecutive errors on successful connection
|
||||
if p.consecutiveErrors > 0 {
|
||||
xl.Infof("connection closed normally for visitor [%s], resetting error counter (was %d)",
|
||||
p.pluginCtx.Name, p.consecutiveErrors)
|
||||
p.consecutiveErrors = 0
|
||||
} else {
|
||||
xl.Infof("connection closed normally for visitor [%s]", p.pluginCtx.Name)
|
||||
}
|
||||
reconnectDelay = 10 * time.Second
|
||||
}
|
||||
|
||||
// The visitor closed the plugin side. Close the controller side.
|
||||
p.cleanupControllerConn(xl)
|
||||
|
||||
xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
|
||||
@@ -184,7 +212,7 @@ func (p *VirtualNetPlugin) Close() error {
|
||||
}
|
||||
|
||||
// Explicitly close the controller side of the pipe.
|
||||
// This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end.
|
||||
// This ensures the pipe is broken even if the run loop is stuck or the visitor hasn't closed its end.
|
||||
p.cleanupControllerConn(xl)
|
||||
xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
|
||||
|
||||
|
||||
@@ -135,11 +135,11 @@ type CloseNotifyConn struct {
|
||||
// 1 means closed
|
||||
closeFlag int32
|
||||
|
||||
closeFn func()
|
||||
closeFn func(error)
|
||||
}
|
||||
|
||||
// closeFn will be only called once
|
||||
func WrapCloseNotifyConn(c net.Conn, closeFn func()) net.Conn {
|
||||
// closeFn will be only called once with the error (nil if Close() was called, non-nil if CloseWithError() was called)
|
||||
func WrapCloseNotifyConn(c net.Conn, closeFn func(error)) *CloseNotifyConn {
|
||||
return &CloseNotifyConn{
|
||||
Conn: c,
|
||||
closeFn: closeFn,
|
||||
@@ -151,12 +151,25 @@ func (cc *CloseNotifyConn) Close() (err error) {
|
||||
if pflag == 0 {
|
||||
err = cc.Conn.Close()
|
||||
if cc.closeFn != nil {
|
||||
cc.closeFn()
|
||||
cc.closeFn(nil)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// CloseWithError closes the connection and passes the error to the close callback.
|
||||
func (cc *CloseNotifyConn) CloseWithError(err error) error {
|
||||
pflag := atomic.SwapInt32(&cc.closeFlag, 1)
|
||||
if pflag == 0 {
|
||||
closeErr := cc.Conn.Close()
|
||||
if cc.closeFn != nil {
|
||||
cc.closeFn(err)
|
||||
}
|
||||
return closeErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type StatsConn struct {
|
||||
net.Conn
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
|
||||
muxer := http.NewServeMux()
|
||||
muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) {
|
||||
notifyCh := make(chan struct{})
|
||||
conn := WrapCloseNotifyConn(c, func() {
|
||||
conn := WrapCloseNotifyConn(c, func(_ error) {
|
||||
close(notifyCh)
|
||||
})
|
||||
wl.acceptCh <- conn
|
||||
|
||||
Reference in New Issue
Block a user