client/proxy: extract wrapWorkConn to deduplicate UDP/SUDP connection wrapping

Move the repeated rate-limiting, encryption, and compression wrapping
logic from UDPProxy and SUDPProxy into a shared BaseProxy.wrapWorkConn
method, reducing ~18 lines of duplication in each proxy type.
This commit is contained in:
fatedier
2026-03-06 23:39:12 +08:00
parent 0b4f83cd04
commit 5eab214768
3 changed files with 30 additions and 42 deletions

View File

@@ -16,6 +16,7 @@ package proxy
import ( import (
"context" "context"
"fmt"
"io" "io"
"net" "net"
"reflect" "reflect"
@@ -122,6 +123,29 @@ func (pxy *BaseProxy) Close() {
} }
} }
// wrapWorkConn applies rate limiting, encryption, and compression
// to a work connection based on the proxy's transport configuration.
func (pxy *BaseProxy) wrapWorkConn(conn net.Conn) (net.Conn, error) {
var rwc io.ReadWriteCloser = conn
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
}
if pxy.baseCfg.Transport.UseEncryption {
var err error
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
conn.Close()
return nil, fmt.Errorf("create encryption stream error: %w", err)
}
}
if pxy.baseCfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
return netpkg.WrapReadWriteCloserToConn(rwc, conn), nil
}
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) { func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
pxy.inWorkConnCallback = cb pxy.inWorkConnCallback = cb
} }

View File

@@ -17,7 +17,6 @@
package proxy package proxy
import ( import (
"io"
"net" "net"
"reflect" "reflect"
"strconv" "strconv"
@@ -25,13 +24,10 @@ import (
"time" "time"
"github.com/fatedier/golib/errors" "github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
v1 "github.com/fatedier/frp/pkg/config/v1" v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp" "github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
netpkg "github.com/fatedier/frp/pkg/util/net"
) )
func init() { func init() {
@@ -83,25 +79,11 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
xl := pxy.xl xl := pxy.xl
xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String()) xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
var rwc io.ReadWriteCloser = conn
var err error var err error
if pxy.limiter != nil { if conn, err = pxy.wrapWorkConn(conn); err != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { xl.Errorf("wrap work conn error: %v", err)
return conn.Close() return
})
} }
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
conn.Close()
xl.Errorf("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
workConn := conn workConn := conn
readCh := make(chan *msg.UDPPacket, 1024) readCh := make(chan *msg.UDPPacket, 1024)

View File

@@ -17,20 +17,16 @@
package proxy package proxy
import ( import (
"io"
"net" "net"
"reflect" "reflect"
"strconv" "strconv"
"time" "time"
"github.com/fatedier/golib/errors" "github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
v1 "github.com/fatedier/frp/pkg/config/v1" v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp" "github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
netpkg "github.com/fatedier/frp/pkg/util/net"
) )
func init() { func init() {
@@ -94,25 +90,11 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
// close resources related with old workConn // close resources related with old workConn
pxy.Close() pxy.Close()
var rwc io.ReadWriteCloser = conn
var err error var err error
if pxy.limiter != nil { if conn, err = pxy.wrapWorkConn(conn); err != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { xl.Errorf("wrap work conn error: %v", err)
return conn.Close() return
})
} }
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
conn.Close()
xl.Errorf("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
pxy.mu.Lock() pxy.mu.Lock()
pxy.workConn = conn pxy.workConn = conn