Compare commits

...

12 Commits

Author SHA1 Message Date
fatedier
cf396563f8 client/proxy: unify work conn wrapping across all proxy types (#5212)
* client/proxy: extract wrapWorkConn to deduplicate UDP/SUDP connection wrapping

Move the repeated rate-limiting, encryption, and compression wrapping
logic from UDPProxy and SUDPProxy into a shared BaseProxy.wrapWorkConn
method, reducing ~18 lines of duplication in each proxy type.

* client/proxy: unify work conn wrapping with pooled compression for all proxy types

Refactor wrapWorkConn to accept encKey parameter and return
(io.ReadWriteCloser, recycleFn, error), enabling HandleTCPWorkConnection
to reuse the same limiter/encryption/compression pipeline.

Switch all proxy types from WithCompression to WithCompressionFromPool.
TCP non-plugin path calls recycleFn via defer after Join; plugin and
UDP/SUDP paths skip recycle (objects are GC'd safely, per golib contract).
2026-03-07 01:33:37 +08:00
fatedier
0b4f83cd04 pkg/config: use modern Go stdlib for sorting and string operations (#5210)
- slices.SortedFunc + maps.Values + cmp.Compare instead of manual
  map-to-slice collection + sort.Slice (source/aggregator.go)
- strings.CutSuffix instead of HasSuffix+TrimSuffix, and deduplicate
  error handling in BandwidthQuantity.UnmarshalString (types/types.go)
2026-03-06 23:13:29 +08:00
fatedier
e9f7a1a9f2 pkg: use modern Go stdlib functions to simplify code (#5209)
- strings.CutPrefix instead of HasPrefix+TrimPrefix (naming, legacy)
- slices.Contains instead of manual loop (plugin/server)
- min/max builtins instead of manual comparisons (nathole)
2026-03-06 22:14:46 +08:00
fatedier
d644593342 server/proxy: simplify HTTPS and TCPMux proxy domain registration (#5208)
Consolidate the separate custom-domain loop and subdomain block into a
single unified loop, matching the pattern already applied to HTTPProxy
in PR #5207. No behavioral change.
2026-03-06 21:31:29 +08:00
fatedier
427c4ca3ae server/proxy: simplify HTTP proxy domain registration by removing duplicate loop (#5207)
The Run() method had two nearly identical loop blocks for registering
custom domains and subdomain, with the same group/non-group registration
logic copy-pasted (~30 lines of duplication).

Consolidate by collecting all domains into a single slice first, then
iterating once with the shared registration logic. Also fixes a minor
inconsistency where the custom domain block used routeConfig.Domain in
CanonicalAddr but the subdomain block used tmpRouteConfig.Domain.
2026-03-06 21:17:30 +08:00
fatedier
f2d1f3739a pkg/util/xlog: fix AddPrefix not updating existing prefix due to range value copy (#5206)
In AddPrefix, the loop `for _, p := range l.prefixes` creates a copy
of each element. Assignments to p.Value and p.Priority only modify
the local copy, not the original slice element, causing updates to
existing prefixes to be silently lost.

This affects client/service.go where AddPrefix is called with
Name:"runID" on reconnection — the old runID value would persist
in log output instead of being updated to the new one.

Fix by using index-based access `l.prefixes[i]` to modify the
original slice element, and add break since prefix names are unique.
2026-03-06 20:44:40 +08:00
fatedier
c23894f156 fix: validate CA cert parsing and add missing ReadHeaderTimeout (#5205)
- pkg/transport/tls.go: check AppendCertsFromPEM return value and
  return clear error when CA file contains no valid PEM certificates
- pkg/plugin/client/http2http.go: set ReadHeaderTimeout to 60s to
  match other plugins and prevent slow header attacks
- pkg/plugin/client/http2https.go: same ReadHeaderTimeout fix
2026-03-06 17:59:41 +08:00
fatedier
cb459b02b6 fix: WebsocketListener nil panic and OIDC auth data race (#5204)
- pkg/util/net/websocket.go: store ln parameter in struct to prevent
  nil pointer panic when Addr() is called
- pkg/auth/oidc.go: replace unsynchronized []string with map + RWMutex
  for subjectsFromLogin to fix data race across concurrent connections
2026-03-06 16:51:52 +08:00
fatedier
8f633fe363 fix: return buffers to pool on error paths to reduce GC pressure (#5203)
- pkg/nathole/nathole.go: add pool.PutBuf(buf) on ReadFromUDP error
  and DecodeMessageInto error paths in waitDetectMessage
- pkg/proto/udp/udp.go: add defer pool.PutBuf(buf) in writerFn to
  ensure buffer is returned when the goroutine exits
2026-03-06 15:55:22 +08:00
fatedier
c62a1da161 fix: close connections on error paths to prevent resource leaks (#5202)
Fix connection leaks in multiple error paths across client and server:
- server/proxy/http: close tmpConn when WithEncryption fails
- client/proxy: close localConn when ProxyProtocol WriteTo fails
- client/visitor/sudp: close visitorConn on all error paths in getNewVisitorConn
- client/visitor/xtcp: close tunnelConn when WithEncryption fails
- client/visitor/xtcp: close lConn when NewKCPConnFromUDP fails
- pkg/plugin/client/unix_domain_socket: close localConn and connInfo.Conn when WriteTo fails, close connInfo.Conn when DialUnix fails
- pkg/plugin/client/tls2raw: close tlsConn when Handshake or Dial fails
2026-03-06 15:18:38 +08:00
fatedier
f22f7d539c server/group: fix port leak and incorrect Listen port in TCPGroup (#5200)
Fix two bugs in TCPGroup.Listen():
- Release acquired port when net.Listen fails to prevent port leak
- Use realPort instead of port for net.Listen to ensure consistency
  between port manager records and actual listening port
2026-03-06 02:25:47 +08:00
fatedier
462c987f6d pkg/msg: change UDPPacket.Content from string to []byte to avoid redundant base64 encode/decode (#5198) 2026-03-06 01:38:24 +08:00
30 changed files with 206 additions and 213 deletions

View File

@@ -16,6 +16,7 @@ package proxy
import (
"context"
"fmt"
"io"
"net"
"reflect"
@@ -122,6 +123,33 @@ func (pxy *BaseProxy) Close() {
}
}
// wrapWorkConn applies rate limiting, encryption, and compression
// to a work connection based on the proxy's transport configuration.
// The returned recycle function should be called when the stream is no longer in use
// to return compression resources to the pool. It is safe to not call recycle,
// in which case resources will be garbage collected normally.
func (pxy *BaseProxy) wrapWorkConn(conn net.Conn, encKey []byte) (io.ReadWriteCloser, func(), error) {
var rwc io.ReadWriteCloser = conn
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
}
if pxy.baseCfg.Transport.UseEncryption {
var err error
rwc, err = libio.WithEncryption(rwc, encKey)
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("create encryption stream error: %w", err)
}
}
var recycleFn func()
if pxy.baseCfg.Transport.UseCompression {
rwc, recycleFn = libio.WithCompressionFromPool(rwc)
}
return rwc, recycleFn, nil
}
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
pxy.inWorkConnCallback = cb
}
@@ -139,30 +167,14 @@ func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
xl := pxy.xl
baseCfg := pxy.baseCfg
var (
remote io.ReadWriteCloser
err error
)
remote = workConn
if pxy.limiter != nil {
remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
return workConn.Close()
})
}
xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t",
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
if baseCfg.Transport.UseEncryption {
remote, err = libio.WithEncryption(remote, encKey)
if err != nil {
workConn.Close()
xl.Errorf("create encryption stream error: %v", err)
return
}
}
var compressionResourceRecycleFn func()
if baseCfg.Transport.UseCompression {
remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote)
remote, recycleFn, err := pxy.wrapWorkConn(workConn, encKey)
if err != nil {
xl.Errorf("wrap work connection: %v", err)
return
}
// check if we need to send proxy protocol info
@@ -178,7 +190,6 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
}
if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
// Use the common proxy protocol builder function
header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion)
connInfo.ProxyProtocolHeader = header
}
@@ -187,12 +198,18 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
if pxy.proxyPlugin != nil {
// if plugin is set, let plugin handle connection first
// Don't recycle compression resources here because plugins may
// retain the connection after Handle returns.
xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
xl.Debugf("handle by plugin finished")
return
}
if recycleFn != nil {
defer recycleFn()
}
localConn, err := libnet.Dial(
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
libnet.WithTimeout(10*time.Second),
@@ -209,6 +226,7 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
if connInfo.ProxyProtocolHeader != nil {
if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
workConn.Close()
localConn.Close()
xl.Errorf("write proxy protocol header to local conn error: %v", err)
return
}
@@ -219,7 +237,4 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
if len(errs) > 0 {
xl.Tracef("join connections errors: %v", errs)
}
if compressionResourceRecycleFn != nil {
compressionResourceRecycleFn()
}
}

View File

@@ -17,7 +17,6 @@
package proxy
import (
"io"
"net"
"reflect"
"strconv"
@@ -25,12 +24,10 @@ import (
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
@@ -83,27 +80,13 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
xl := pxy.xl
xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
var rwc io.ReadWriteCloser = conn
var err error
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
if err != nil {
xl.Errorf("wrap work connection: %v", err)
return
}
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
conn.Close()
xl.Errorf("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
workConn := conn
workConn := netpkg.WrapReadWriteCloserToConn(remote, conn)
readCh := make(chan *msg.UDPPacket, 1024)
sendCh := make(chan msg.Message, 1024)
isClose := false

View File

@@ -17,19 +17,16 @@
package proxy
import (
"io"
"net"
"reflect"
"strconv"
"time"
"github.com/fatedier/golib/errors"
libio "github.com/fatedier/golib/io"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
netpkg "github.com/fatedier/frp/pkg/util/net"
)
@@ -94,28 +91,14 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
// close resources related with old workConn
pxy.Close()
var rwc io.ReadWriteCloser = conn
var err error
if pxy.limiter != nil {
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
return conn.Close()
})
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
if err != nil {
xl.Errorf("wrap work connection: %v", err)
return
}
if pxy.cfg.Transport.UseEncryption {
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
conn.Close()
xl.Errorf("create encryption stream error: %v", err)
return
}
}
if pxy.cfg.Transport.UseCompression {
rwc = libio.WithCompression(rwc)
}
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
pxy.mu.Lock()
pxy.workConn = conn
pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn)
pxy.readCh = make(chan *msg.UDPPacket, 1024)
pxy.sendCh = make(chan msg.Message, 1024)
pxy.closed = false
@@ -129,7 +112,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
return
}
if errRet := errors.PanicToError(func() {
xl.Tracef("get udp package from workConn: %s", udpMsg.Content)
xl.Tracef("get udp package from workConn, len: %d", len(udpMsg.Content))
readCh <- &udpMsg
}); errRet != nil {
xl.Infof("reader goroutine for udp work connection closed: %v", errRet)
@@ -145,7 +128,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
for rawMsg := range sendCh {
switch m := rawMsg.(type) {
case *msg.UDPPacket:
xl.Tracef("send udp package to workConn: %s", m.Content)
xl.Tracef("send udp package to workConn, len: %d", len(m.Content))
case *msg.Ping:
xl.Tracef("send ping message to udp workConn")
}

View File

@@ -147,7 +147,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
case *msg.UDPPacket:
if errRet := errors.PanicToError(func() {
sv.readCh <- m
xl.Tracef("frpc visitor get udp packet from workConn: %s", m.Content)
xl.Tracef("frpc visitor get udp packet from workConn, len: %d", len(m.Content))
}); errRet != nil {
xl.Infof("reader goroutine for udp work connection closed")
return
@@ -169,7 +169,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
xl.Tracef("send udp package to workConn: %s", firstPacket.Content)
xl.Tracef("send udp package to workConn, len: %d", len(firstPacket.Content))
}
for {
@@ -184,7 +184,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
xl.Tracef("send udp package to workConn: %s", udpMsg.Content)
xl.Tracef("send udp package to workConn, len: %d", len(udpMsg.Content))
case <-closeCh:
return
}
@@ -217,6 +217,7 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
visitorConn.Close()
return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err)
}
@@ -224,11 +225,13 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
_ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
visitorConn.Close()
return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
}
_ = visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
visitorConn.Close()
return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
}
@@ -238,6 +241,7 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
if err != nil {
xl.Errorf("create encryption stream error: %v", err)
visitorConn.Close()
return nil, err
}
}

View File

@@ -211,6 +211,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
if err != nil {
xl.Errorf("create encryption stream error: %v", err)
tunnelConn.Close()
tunnelErr = err
return
}
@@ -373,6 +374,7 @@ func (ks *KCPTunnelSession) Init(listenConn *net.UDPConn, raddr *net.UDPAddr) er
}
remote, err := netpkg.NewKCPConnFromUDP(lConn, true, raddr.String())
if err != nil {
lConn.Close()
return fmt.Errorf("create kcp connection from udp connection error: %v", err)
}

View File

@@ -23,6 +23,7 @@ import (
"net/url"
"os"
"slices"
"sync"
"github.com/coreos/go-oidc/v3/oidc"
"golang.org/x/oauth2"
@@ -205,7 +206,8 @@ type OidcAuthConsumer struct {
additionalAuthScopes []v1.AuthScope
verifier TokenVerifier
subjectsFromLogin []string
mu sync.RWMutex
subjectsFromLogin map[string]struct{}
}
func NewTokenVerifier(cfg v1.AuthOIDCServerConfig) TokenVerifier {
@@ -226,7 +228,7 @@ func NewOidcAuthVerifier(additionalAuthScopes []v1.AuthScope, verifier TokenVeri
return &OidcAuthConsumer{
additionalAuthScopes: additionalAuthScopes,
verifier: verifier,
subjectsFromLogin: []string{},
subjectsFromLogin: make(map[string]struct{}),
}
}
@@ -235,9 +237,9 @@ func (auth *OidcAuthConsumer) VerifyLogin(loginMsg *msg.Login) (err error) {
if err != nil {
return fmt.Errorf("invalid OIDC token in login: %v", err)
}
if !slices.Contains(auth.subjectsFromLogin, token.Subject) {
auth.subjectsFromLogin = append(auth.subjectsFromLogin, token.Subject)
}
auth.mu.Lock()
auth.subjectsFromLogin[token.Subject] = struct{}{}
auth.mu.Unlock()
return nil
}
@@ -246,11 +248,13 @@ func (auth *OidcAuthConsumer) verifyPostLoginToken(privilegeKey string) (err err
if err != nil {
return fmt.Errorf("invalid OIDC token in ping: %v", err)
}
if !slices.Contains(auth.subjectsFromLogin, token.Subject) {
auth.mu.RLock()
_, ok := auth.subjectsFromLogin[token.Subject]
auth.mu.RUnlock()
if !ok {
return fmt.Errorf("received different OIDC subject in login and ping. "+
"original subjects: %s, "+
"new subject: %s",
auth.subjectsFromLogin, token.Subject)
token.Subject)
}
return nil
}

View File

@@ -171,15 +171,14 @@ func Convert_ServerCommonConf_To_v1(conf *ServerCommonConf) *v1.ServerConfig {
func transformHeadersFromPluginParams(params map[string]string) v1.HeaderOperations {
out := v1.HeaderOperations{}
for k, v := range params {
if !strings.HasPrefix(k, "plugin_header_") {
k, ok := strings.CutPrefix(k, "plugin_header_")
if !ok || k == "" {
continue
}
if k = strings.TrimPrefix(k, "plugin_header_"); k != "" {
if out.Set == nil {
out.Set = make(map[string]string)
}
out.Set[k] = v
if out.Set == nil {
out.Set = make(map[string]string)
}
out.Set[k] = v
}
return out
}

View File

@@ -22,8 +22,8 @@ func GetMapWithoutPrefix(set map[string]string, prefix string) map[string]string
m := make(map[string]string)
for key, value := range set {
if strings.HasPrefix(key, prefix) {
m[strings.TrimPrefix(key, prefix)] = value
if trimmed, ok := strings.CutPrefix(key, prefix); ok {
m[trimmed] = value
}
}

View File

@@ -15,9 +15,11 @@
package source
import (
"cmp"
"errors"
"fmt"
"sort"
"maps"
"slices"
"sync"
v1 "github.com/fatedier/frp/pkg/config/v1"
@@ -97,21 +99,11 @@ func (a *Aggregator) mapsToSortedSlices(
proxyMap map[string]v1.ProxyConfigurer,
visitorMap map[string]v1.VisitorConfigurer,
) ([]v1.ProxyConfigurer, []v1.VisitorConfigurer) {
proxies := make([]v1.ProxyConfigurer, 0, len(proxyMap))
for _, p := range proxyMap {
proxies = append(proxies, p)
}
sort.Slice(proxies, func(i, j int) bool {
return proxies[i].GetBaseConfig().Name < proxies[j].GetBaseConfig().Name
proxies := slices.SortedFunc(maps.Values(proxyMap), func(x, y v1.ProxyConfigurer) int {
return cmp.Compare(x.GetBaseConfig().Name, y.GetBaseConfig().Name)
})
visitors := make([]v1.VisitorConfigurer, 0, len(visitorMap))
for _, v := range visitorMap {
visitors = append(visitors, v)
}
sort.Slice(visitors, func(i, j int) bool {
return visitors[i].GetBaseConfig().Name < visitors[j].GetBaseConfig().Name
visitors := slices.SortedFunc(maps.Values(visitorMap), func(x, y v1.VisitorConfigurer) int {
return cmp.Compare(x.GetBaseConfig().Name, y.GetBaseConfig().Name)
})
return proxies, visitors
}

View File

@@ -196,6 +196,27 @@ func TestAggregator_VisitorMerge(t *testing.T) {
require.Len(visitors, 2)
}
func TestAggregator_Load_ReturnsSortedByName(t *testing.T) {
require := require.New(t)
agg := newTestAggregator(t, nil)
err := agg.ConfigSource().ReplaceAll(
[]v1.ProxyConfigurer{mockProxy("charlie"), mockProxy("alice"), mockProxy("bob")},
[]v1.VisitorConfigurer{mockVisitor("zulu"), mockVisitor("alpha")},
)
require.NoError(err)
proxies, visitors, err := agg.Load()
require.NoError(err)
require.Len(proxies, 3)
require.Equal("alice", proxies[0].GetBaseConfig().Name)
require.Equal("bob", proxies[1].GetBaseConfig().Name)
require.Equal("charlie", proxies[2].GetBaseConfig().Name)
require.Len(visitors, 2)
require.Equal("alpha", visitors[0].GetBaseConfig().Name)
require.Equal("zulu", visitors[1].GetBaseConfig().Name)
}
func TestAggregator_Load_ReturnsDefensiveCopies(t *testing.T) {
require := require.New(t)

View File

@@ -70,24 +70,18 @@ func (q *BandwidthQuantity) UnmarshalString(s string) error {
f float64
err error
)
switch {
case strings.HasSuffix(s, "MB"):
if fstr, ok := strings.CutSuffix(s, "MB"); ok {
base = MB
fstr := strings.TrimSuffix(s, "MB")
f, err = strconv.ParseFloat(fstr, 64)
if err != nil {
return err
}
case strings.HasSuffix(s, "KB"):
} else if fstr, ok := strings.CutSuffix(s, "KB"); ok {
base = KB
fstr := strings.TrimSuffix(s, "KB")
f, err = strconv.ParseFloat(fstr, 64)
if err != nil {
return err
}
default:
} else {
return errors.New("unit not support")
}
if err != nil {
return err
}
q.s = s
q.i = int64(f * float64(base))

View File

@@ -39,6 +39,31 @@ func TestBandwidthQuantity(t *testing.T) {
require.Equal(`{"b":"1KB","int":5}`, string(buf))
}
func TestBandwidthQuantity_MB(t *testing.T) {
require := require.New(t)
var w Wrap
err := json.Unmarshal([]byte(`{"b":"2MB","int":1}`), &w)
require.NoError(err)
require.EqualValues(2*MB, w.B.Bytes())
buf, err := json.Marshal(&w)
require.NoError(err)
require.Equal(`{"b":"2MB","int":1}`, string(buf))
}
func TestBandwidthQuantity_InvalidUnit(t *testing.T) {
var w Wrap
err := json.Unmarshal([]byte(`{"b":"1GB","int":1}`), &w)
require.Error(t, err)
}
func TestBandwidthQuantity_InvalidNumber(t *testing.T) {
var w Wrap
err := json.Unmarshal([]byte(`{"b":"abcKB","int":1}`), &w)
require.Error(t, err)
}
func TestPortsRangeSlice2String(t *testing.T) {
require := require.New(t)

View File

@@ -184,7 +184,7 @@ type Pong struct {
}
type UDPPacket struct {
Content string `json:"c,omitempty"`
Content []byte `json:"c,omitempty"`
LocalAddr *net.UDPAddr `json:"l,omitempty"`
RemoteAddr *net.UDPAddr `json:"r,omitempty"`
}

View File

@@ -16,9 +16,8 @@ func StripUserPrefix(user, name string) string {
if user == "" {
return name
}
prefix := user + "."
if strings.HasPrefix(name, prefix) {
return strings.TrimPrefix(name, prefix)
if trimmed, ok := strings.CutPrefix(name, user+"."); ok {
return trimmed
}
return name
}

View File

@@ -70,12 +70,8 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err
continue
}
if portNum > portMax {
portMax = portNum
}
if portNum < portMin {
portMin = portNum
}
portMax = max(portMax, portNum)
portMin = min(portMin, portNum)
if baseIP != ip {
ipChanged = true
}

View File

@@ -298,11 +298,13 @@ func waitDetectMessage(
n, raddr, err := conn.ReadFromUDP(buf)
_ = conn.SetReadDeadline(time.Time{})
if err != nil {
pool.PutBuf(buf)
return nil, err
}
xl.Debugf("get udp message local %s, from %s", conn.LocalAddr(), raddr)
var m msg.NatHoleSid
if err := DecodeMessageInto(buf[:n], key, &m); err != nil {
pool.PutBuf(buf)
xl.Warnf("decode sid message error: %v", err)
continue
}

View File

@@ -21,6 +21,7 @@ import (
stdlog "log"
"net/http"
"net/http/httputil"
"time"
"github.com/fatedier/golib/pool"
@@ -68,7 +69,7 @@ func NewHTTP2HTTPPlugin(_ PluginContext, options v1.ClientPluginOptions) (Plugin
p.s = &http.Server{
Handler: rp,
ReadHeaderTimeout: 0,
ReadHeaderTimeout: 60 * time.Second,
}
go func() {

View File

@@ -22,6 +22,7 @@ import (
stdlog "log"
"net/http"
"net/http/httputil"
"time"
"github.com/fatedier/golib/pool"
@@ -77,7 +78,7 @@ func NewHTTP2HTTPSPlugin(_ PluginContext, options v1.ClientPluginOptions) (Plugi
p.s = &http.Server{
Handler: rp,
ReadHeaderTimeout: 0,
ReadHeaderTimeout: 60 * time.Second,
}
go func() {

View File

@@ -62,11 +62,13 @@ func (p *TLS2RawPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) {
if err := tlsConn.Handshake(); err != nil {
xl.Warnf("tls handshake error: %v", err)
tlsConn.Close()
return
}
rawConn, err := net.Dial("tcp", p.opts.LocalAddr)
if err != nil {
xl.Warnf("dial to local addr error: %v", err)
tlsConn.Close()
return
}

View File

@@ -54,10 +54,13 @@ func (uds *UnixDomainSocketPlugin) Handle(ctx context.Context, connInfo *Connect
localConn, err := net.DialUnix("unix", nil, uds.UnixAddr)
if err != nil {
xl.Warnf("dial to uds %s error: %v", uds.UnixAddr, err)
connInfo.Conn.Close()
return
}
if connInfo.ProxyProtocolHeader != nil {
if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
localConn.Close()
connInfo.Conn.Close()
return
}
}

View File

@@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"reflect"
"slices"
"strings"
v1 "github.com/fatedier/frp/pkg/config/v1"
@@ -64,12 +65,7 @@ func (p *httpPlugin) Name() string {
}
func (p *httpPlugin) IsSupport(op string) bool {
for _, v := range p.options.Ops {
if v == op {
return true
}
}
return false
return slices.Contains(p.options.Ops, op)
}
func (p *httpPlugin) Handle(ctx context.Context, op string, content any) (*Response, any, error) {

View File

@@ -15,7 +15,6 @@
package udp
import (
"encoding/base64"
"net"
"sync"
"time"
@@ -28,16 +27,17 @@ import (
)
func NewUDPPacket(buf []byte, laddr, raddr *net.UDPAddr) *msg.UDPPacket {
content := make([]byte, len(buf))
copy(content, buf)
return &msg.UDPPacket{
Content: base64.StdEncoding.EncodeToString(buf),
Content: content,
LocalAddr: laddr,
RemoteAddr: raddr,
}
}
func GetContent(m *msg.UDPPacket) (buf []byte, err error) {
buf, err = base64.StdEncoding.DecodeString(m.Content)
return
return m.Content, nil
}
func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh chan<- *msg.UDPPacket, bufSize int) {
@@ -60,7 +60,7 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh
if err != nil {
return
}
// buf[:n] will be encoded to string, so the bytes can be reused
// NewUDPPacket copies buf[:n], so the read buffer can be reused
udpMsg := NewUDPPacket(buf[:n], nil, remoteAddr)
select {
@@ -85,6 +85,7 @@ func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UDPPacket, sendCh chan<-
}()
buf := pool.GetBuf(bufSize)
defer pool.PutBuf(buf)
for {
_ = udpConn.SetReadDeadline(time.Now().Add(30 * time.Second))
n, _, err := udpConn.ReadFromUDP(buf)

View File

@@ -20,6 +20,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"math/big"
"os"
"time"
@@ -85,7 +86,9 @@ func newCertPool(caPath string) (*x509.CertPool, error) {
return nil, err
}
pool.AppendCertsFromPEM(caCrt)
if !pool.AppendCertsFromPEM(caCrt) {
return nil, fmt.Errorf("failed to parse CA certificate from file %q: no valid PEM certificates found", caPath)
}
return pool, nil
}

View File

@@ -26,6 +26,7 @@ type WebsocketListener struct {
// ln: tcp listener for websocket connections
func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
wl = &WebsocketListener{
ln: ln,
acceptCh: make(chan net.Conn),
}

View File

@@ -63,11 +63,12 @@ func (l *Logger) AddPrefix(prefix LogPrefix) *Logger {
if prefix.Priority <= 0 {
prefix.Priority = 10
}
for _, p := range l.prefixes {
for i, p := range l.prefixes {
if p.Name == prefix.Name {
found = true
p.Value = prefix.Value
p.Priority = prefix.Priority
l.prefixes[i].Value = prefix.Value
l.prefixes[i].Priority = prefix.Priority
break
}
}
if !found {

View File

@@ -100,8 +100,9 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
if err != nil {
return
}
tcpLn, errRet := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(port)))
tcpLn, errRet := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(realPort)))
if errRet != nil {
tg.ctl.portManager.Release(realPort)
err = errRet
return
}

View File

@@ -75,16 +75,21 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
}
}()
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
if domain == "" {
continue
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
}
if pxy.cfg.SubDomain != "" {
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
addrs := make([]string, 0)
for _, domain := range domains {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
tmpRouteConfig := routeConfig
// handle group
@@ -93,12 +98,10 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
if err != nil {
return
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.LoadBalancer.Group, tmpRouteConfig)
})
} else {
// no group
err = pxy.rc.HTTPReverseProxy.Register(routeConfig)
if err != nil {
return
@@ -112,39 +115,6 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
routeConfig.Domain, routeConfig.Location, pxy.cfg.LoadBalancer.Group, pxy.cfg.RouteByHTTPUser)
}
}
if pxy.cfg.SubDomain != "" {
routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
tmpRouteConfig := routeConfig
// handle group
if pxy.cfg.LoadBalancer.Group != "" {
err = pxy.rc.HTTPGroupCtl.Register(pxy.name, pxy.cfg.LoadBalancer.Group, pxy.cfg.LoadBalancer.GroupKey, routeConfig)
if err != nil {
return
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.LoadBalancer.Group, tmpRouteConfig)
})
} else {
err = pxy.rc.HTTPReverseProxy.Register(routeConfig)
if err != nil {
return
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPReverseProxy.UnRegister(tmpRouteConfig)
})
}
addrs = append(addrs, util.CanonicalAddr(tmpRouteConfig.Domain, pxy.serverCfg.VhostHTTPPort))
xl.Infof("http proxy listen for host [%s] location [%s] group [%s], routeByHTTPUser [%s]",
routeConfig.Domain, routeConfig.Location, pxy.cfg.LoadBalancer.Group, pxy.cfg.RouteByHTTPUser)
}
}
remoteAddr = strings.Join(addrs, ",")
return
}
@@ -168,6 +138,7 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
if err != nil {
xl.Errorf("create encryption stream error: %v", err)
tmpConn.Close()
return
}
}

View File

@@ -53,23 +53,18 @@ func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) {
pxy.Close()
}
}()
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
if domain == "" {
continue
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
l, err := pxy.listenForDomain(routeConfig, domain)
if err != nil {
return "", err
}
pxy.listeners = append(pxy.listeners, l)
addrs = append(addrs, util.CanonicalAddr(domain, pxy.serverCfg.VhostHTTPSPort))
xl.Infof("https proxy listen for host [%s] group [%s]", domain, pxy.cfg.LoadBalancer.Group)
}
if pxy.cfg.SubDomain != "" {
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
if pxy.cfg.SubDomain != "" {
domain := pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
addrs := make([]string, 0)
for _, domain := range domains {
l, err := pxy.listenForDomain(routeConfig, domain)
if err != nil {
return "", err

View File

@@ -72,21 +72,19 @@ func (pxy *TCPMuxProxy) httpConnectListen(
}
func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) {
addrs := make([]string, 0)
for _, domain := range pxy.cfg.CustomDomains {
if domain == "" {
continue
}
addrs, err = pxy.httpConnectListen(domain, pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
if err != nil {
return "", err
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
}
if pxy.cfg.SubDomain != "" {
addrs, err = pxy.httpConnectListen(pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost,
pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
addrs := make([]string, 0)
for _, domain := range domains {
addrs, err = pxy.httpConnectListen(domain, pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
if err != nil {
return "", err
}

View File

@@ -136,7 +136,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
continue
case *msg.UDPPacket:
if errRet := errors.PanicToError(func() {
xl.Tracef("get udp message from workConn: %s", m.Content)
xl.Tracef("get udp message from workConn, len: %d", len(m.Content))
pxy.readCh <- m
metrics.Server.AddTrafficOut(
pxy.GetName(),
@@ -167,7 +167,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
conn.Close()
return
}
xl.Tracef("send message to udp workConn: %s", udpMsg.Content)
xl.Tracef("send message to udp workConn, len: %d", len(udpMsg.Content))
metrics.Server.AddTrafficIn(
pxy.GetName(),
pxy.GetConfigurer().GetBaseConfig().Type,