diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 148e0e90..84ff49a1 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -125,7 +125,10 @@ func (pxy *BaseProxy) Close() { // wrapWorkConn applies rate limiting, encryption, and compression // to a work connection based on the proxy's transport configuration. -func (pxy *BaseProxy) wrapWorkConn(conn net.Conn) (net.Conn, error) { +// The returned recycle function should be called when the stream is no longer in use +// to return compression resources to the pool. It is safe to not call recycle, +// in which case resources will be garbage collected normally. +func (pxy *BaseProxy) wrapWorkConn(conn net.Conn, encKey []byte) (io.ReadWriteCloser, func(), error) { var rwc io.ReadWriteCloser = conn if pxy.limiter != nil { rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { @@ -134,16 +137,17 @@ func (pxy *BaseProxy) wrapWorkConn(conn net.Conn) (net.Conn, error) { } if pxy.baseCfg.Transport.UseEncryption { var err error - rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey) + rwc, err = libio.WithEncryption(rwc, encKey) if err != nil { conn.Close() - return nil, fmt.Errorf("create encryption stream error: %w", err) + return nil, nil, fmt.Errorf("create encryption stream error: %w", err) } } + var recycleFn func() if pxy.baseCfg.Transport.UseCompression { - rwc = libio.WithCompression(rwc) + rwc, recycleFn = libio.WithCompressionFromPool(rwc) } - return netpkg.WrapReadWriteCloserToConn(rwc, conn), nil + return rwc, recycleFn, nil } func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) { @@ -163,30 +167,14 @@ func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) { xl := pxy.xl baseCfg := pxy.baseCfg - var ( - remote io.ReadWriteCloser - err error - ) - remote = workConn - if pxy.limiter != nil { - remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error { - return workConn.Close() - }) - } xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t", baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression) - if baseCfg.Transport.UseEncryption { - remote, err = libio.WithEncryption(remote, encKey) - if err != nil { - workConn.Close() - xl.Errorf("create encryption stream error: %v", err) - return - } - } - var compressionResourceRecycleFn func() - if baseCfg.Transport.UseCompression { - remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote) + + remote, recycleFn, err := pxy.wrapWorkConn(workConn, encKey) + if err != nil { + xl.Errorf("wrap work connection: %v", err) + return } // check if we need to send proxy protocol info @@ -202,7 +190,6 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor } if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 { - // Use the common proxy protocol builder function header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion) connInfo.ProxyProtocolHeader = header } @@ -211,12 +198,18 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor if pxy.proxyPlugin != nil { // if plugin is set, let plugin handle connection first + // Don't recycle compression resources here because plugins may + // retain the connection after Handle returns. xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name()) pxy.proxyPlugin.Handle(pxy.ctx, &connInfo) xl.Debugf("handle by plugin finished") return } + if recycleFn != nil { + defer recycleFn() + } + localConn, err := libnet.Dial( net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)), libnet.WithTimeout(10*time.Second), @@ -244,7 +237,4 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor if len(errs) > 0 { xl.Tracef("join connections errors: %v", errs) } - if compressionResourceRecycleFn != nil { - compressionResourceRecycleFn() - } } diff --git a/client/proxy/sudp.go b/client/proxy/sudp.go index 8b8c4985..da941446 100644 --- a/client/proxy/sudp.go +++ b/client/proxy/sudp.go @@ -28,6 +28,7 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/proto/udp" + netpkg "github.com/fatedier/frp/pkg/util/net" ) func init() { @@ -79,13 +80,13 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { xl := pxy.xl xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String()) - var err error - if conn, err = pxy.wrapWorkConn(conn); err != nil { - xl.Errorf("wrap work conn error: %v", err) + remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey) + if err != nil { + xl.Errorf("wrap work connection: %v", err) return } - workConn := conn + workConn := netpkg.WrapReadWriteCloserToConn(remote, conn) readCh := make(chan *msg.UDPPacket, 1024) sendCh := make(chan msg.Message, 1024) isClose := false diff --git a/client/proxy/udp.go b/client/proxy/udp.go index 8848abc8..570da476 100644 --- a/client/proxy/udp.go +++ b/client/proxy/udp.go @@ -27,6 +27,7 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/proto/udp" + netpkg "github.com/fatedier/frp/pkg/util/net" ) func init() { @@ -90,14 +91,14 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { // close resources related with old workConn pxy.Close() - var err error - if conn, err = pxy.wrapWorkConn(conn); err != nil { - xl.Errorf("wrap work conn error: %v", err) + remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey) + if err != nil { + xl.Errorf("wrap work connection: %v", err) return } pxy.mu.Lock() - pxy.workConn = conn + pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn) pxy.readCh = make(chan *msg.UDPPacket, 1024) pxy.sendCh = make(chan msg.Message, 1024) pxy.closed = false