mirror of
https://github.com/fatedier/frp.git
synced 2026-04-29 04:29:11 +08:00
client/proxy: unify work conn wrapping with pooled compression for all proxy types
Refactor wrapWorkConn to accept encKey parameter and return (io.ReadWriteCloser, recycleFn, error), enabling HandleTCPWorkConnection to reuse the same limiter/encryption/compression pipeline. Switch all proxy types from WithCompression to WithCompressionFromPool. TCP non-plugin path calls recycleFn via defer after Join; plugin and UDP/SUDP paths skip recycle (objects are GC'd safely, per golib contract).
This commit is contained in:
@@ -125,7 +125,10 @@ func (pxy *BaseProxy) Close() {
|
|||||||
|
|
||||||
// wrapWorkConn applies rate limiting, encryption, and compression
|
// wrapWorkConn applies rate limiting, encryption, and compression
|
||||||
// to a work connection based on the proxy's transport configuration.
|
// to a work connection based on the proxy's transport configuration.
|
||||||
func (pxy *BaseProxy) wrapWorkConn(conn net.Conn) (net.Conn, error) {
|
// The returned recycle function should be called when the stream is no longer in use
|
||||||
|
// to return compression resources to the pool. It is safe to not call recycle,
|
||||||
|
// in which case resources will be garbage collected normally.
|
||||||
|
func (pxy *BaseProxy) wrapWorkConn(conn net.Conn, encKey []byte) (io.ReadWriteCloser, func(), error) {
|
||||||
var rwc io.ReadWriteCloser = conn
|
var rwc io.ReadWriteCloser = conn
|
||||||
if pxy.limiter != nil {
|
if pxy.limiter != nil {
|
||||||
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
|
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
|
||||||
@@ -134,16 +137,17 @@ func (pxy *BaseProxy) wrapWorkConn(conn net.Conn) (net.Conn, error) {
|
|||||||
}
|
}
|
||||||
if pxy.baseCfg.Transport.UseEncryption {
|
if pxy.baseCfg.Transport.UseEncryption {
|
||||||
var err error
|
var err error
|
||||||
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
|
rwc, err = libio.WithEncryption(rwc, encKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return nil, fmt.Errorf("create encryption stream error: %w", err)
|
return nil, nil, fmt.Errorf("create encryption stream error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var recycleFn func()
|
||||||
if pxy.baseCfg.Transport.UseCompression {
|
if pxy.baseCfg.Transport.UseCompression {
|
||||||
rwc = libio.WithCompression(rwc)
|
rwc, recycleFn = libio.WithCompressionFromPool(rwc)
|
||||||
}
|
}
|
||||||
return netpkg.WrapReadWriteCloserToConn(rwc, conn), nil
|
return rwc, recycleFn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||||
@@ -163,30 +167,14 @@ func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
|
|||||||
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
|
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
|
||||||
xl := pxy.xl
|
xl := pxy.xl
|
||||||
baseCfg := pxy.baseCfg
|
baseCfg := pxy.baseCfg
|
||||||
var (
|
|
||||||
remote io.ReadWriteCloser
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
remote = workConn
|
|
||||||
if pxy.limiter != nil {
|
|
||||||
remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
|
|
||||||
return workConn.Close()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t",
|
xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t",
|
||||||
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
|
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
|
||||||
if baseCfg.Transport.UseEncryption {
|
|
||||||
remote, err = libio.WithEncryption(remote, encKey)
|
remote, recycleFn, err := pxy.wrapWorkConn(workConn, encKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
workConn.Close()
|
xl.Errorf("wrap work connection: %v", err)
|
||||||
xl.Errorf("create encryption stream error: %v", err)
|
return
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var compressionResourceRecycleFn func()
|
|
||||||
if baseCfg.Transport.UseCompression {
|
|
||||||
remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if we need to send proxy protocol info
|
// check if we need to send proxy protocol info
|
||||||
@@ -202,7 +190,6 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
|||||||
}
|
}
|
||||||
|
|
||||||
if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
|
if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
|
||||||
// Use the common proxy protocol builder function
|
|
||||||
header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion)
|
header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion)
|
||||||
connInfo.ProxyProtocolHeader = header
|
connInfo.ProxyProtocolHeader = header
|
||||||
}
|
}
|
||||||
@@ -211,12 +198,18 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
|||||||
|
|
||||||
if pxy.proxyPlugin != nil {
|
if pxy.proxyPlugin != nil {
|
||||||
// if plugin is set, let plugin handle connection first
|
// if plugin is set, let plugin handle connection first
|
||||||
|
// Don't recycle compression resources here because plugins may
|
||||||
|
// retain the connection after Handle returns.
|
||||||
xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
|
xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
|
||||||
pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
|
pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
|
||||||
xl.Debugf("handle by plugin finished")
|
xl.Debugf("handle by plugin finished")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if recycleFn != nil {
|
||||||
|
defer recycleFn()
|
||||||
|
}
|
||||||
|
|
||||||
localConn, err := libnet.Dial(
|
localConn, err := libnet.Dial(
|
||||||
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
|
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
|
||||||
libnet.WithTimeout(10*time.Second),
|
libnet.WithTimeout(10*time.Second),
|
||||||
@@ -244,7 +237,4 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
|||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
xl.Tracef("join connections errors: %v", errs)
|
xl.Tracef("join connections errors: %v", errs)
|
||||||
}
|
}
|
||||||
if compressionResourceRecycleFn != nil {
|
|
||||||
compressionResourceRecycleFn()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import (
|
|||||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||||
"github.com/fatedier/frp/pkg/msg"
|
"github.com/fatedier/frp/pkg/msg"
|
||||||
"github.com/fatedier/frp/pkg/proto/udp"
|
"github.com/fatedier/frp/pkg/proto/udp"
|
||||||
|
netpkg "github.com/fatedier/frp/pkg/util/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -79,13 +80,13 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
|||||||
xl := pxy.xl
|
xl := pxy.xl
|
||||||
xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
|
xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
|
||||||
|
|
||||||
var err error
|
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
|
||||||
if conn, err = pxy.wrapWorkConn(conn); err != nil {
|
if err != nil {
|
||||||
xl.Errorf("wrap work conn error: %v", err)
|
xl.Errorf("wrap work connection: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
workConn := conn
|
workConn := netpkg.WrapReadWriteCloserToConn(remote, conn)
|
||||||
readCh := make(chan *msg.UDPPacket, 1024)
|
readCh := make(chan *msg.UDPPacket, 1024)
|
||||||
sendCh := make(chan msg.Message, 1024)
|
sendCh := make(chan msg.Message, 1024)
|
||||||
isClose := false
|
isClose := false
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import (
|
|||||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||||
"github.com/fatedier/frp/pkg/msg"
|
"github.com/fatedier/frp/pkg/msg"
|
||||||
"github.com/fatedier/frp/pkg/proto/udp"
|
"github.com/fatedier/frp/pkg/proto/udp"
|
||||||
|
netpkg "github.com/fatedier/frp/pkg/util/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -90,14 +91,14 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
|||||||
// close resources related with old workConn
|
// close resources related with old workConn
|
||||||
pxy.Close()
|
pxy.Close()
|
||||||
|
|
||||||
var err error
|
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
|
||||||
if conn, err = pxy.wrapWorkConn(conn); err != nil {
|
if err != nil {
|
||||||
xl.Errorf("wrap work conn error: %v", err)
|
xl.Errorf("wrap work connection: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pxy.mu.Lock()
|
pxy.mu.Lock()
|
||||||
pxy.workConn = conn
|
pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn)
|
||||||
pxy.readCh = make(chan *msg.UDPPacket, 1024)
|
pxy.readCh = make(chan *msg.UDPPacket, 1024)
|
||||||
pxy.sendCh = make(chan msg.Message, 1024)
|
pxy.sendCh = make(chan msg.Message, 1024)
|
||||||
pxy.closed = false
|
pxy.closed = false
|
||||||
|
|||||||
Reference in New Issue
Block a user