mirror of
https://github.com/fatedier/frp.git
synced 2026-03-08 10:59:11 +08:00
Compare commits
12 Commits
copilot/re
...
cf396563f8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cf396563f8 | ||
|
|
0b4f83cd04 | ||
|
|
e9f7a1a9f2 | ||
|
|
d644593342 | ||
|
|
427c4ca3ae | ||
|
|
f2d1f3739a | ||
|
|
c23894f156 | ||
|
|
cb459b02b6 | ||
|
|
8f633fe363 | ||
|
|
c62a1da161 | ||
|
|
f22f7d539c | ||
|
|
462c987f6d |
@@ -16,6 +16,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
@@ -122,6 +123,33 @@ func (pxy *BaseProxy) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
// wrapWorkConn applies rate limiting, encryption, and compression
|
||||
// to a work connection based on the proxy's transport configuration.
|
||||
// The returned recycle function should be called when the stream is no longer in use
|
||||
// to return compression resources to the pool. It is safe to not call recycle,
|
||||
// in which case resources will be garbage collected normally.
|
||||
func (pxy *BaseProxy) wrapWorkConn(conn net.Conn, encKey []byte) (io.ReadWriteCloser, func(), error) {
|
||||
var rwc io.ReadWriteCloser = conn
|
||||
if pxy.limiter != nil {
|
||||
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
|
||||
return conn.Close()
|
||||
})
|
||||
}
|
||||
if pxy.baseCfg.Transport.UseEncryption {
|
||||
var err error
|
||||
rwc, err = libio.WithEncryption(rwc, encKey)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, nil, fmt.Errorf("create encryption stream error: %w", err)
|
||||
}
|
||||
}
|
||||
var recycleFn func()
|
||||
if pxy.baseCfg.Transport.UseCompression {
|
||||
rwc, recycleFn = libio.WithCompressionFromPool(rwc)
|
||||
}
|
||||
return rwc, recycleFn, nil
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
|
||||
pxy.inWorkConnCallback = cb
|
||||
}
|
||||
@@ -139,30 +167,14 @@ func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
|
||||
func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
|
||||
xl := pxy.xl
|
||||
baseCfg := pxy.baseCfg
|
||||
var (
|
||||
remote io.ReadWriteCloser
|
||||
err error
|
||||
)
|
||||
remote = workConn
|
||||
if pxy.limiter != nil {
|
||||
remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
|
||||
return workConn.Close()
|
||||
})
|
||||
}
|
||||
|
||||
xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t",
|
||||
baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
|
||||
if baseCfg.Transport.UseEncryption {
|
||||
remote, err = libio.WithEncryption(remote, encKey)
|
||||
if err != nil {
|
||||
workConn.Close()
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
var compressionResourceRecycleFn func()
|
||||
if baseCfg.Transport.UseCompression {
|
||||
remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote)
|
||||
|
||||
remote, recycleFn, err := pxy.wrapWorkConn(workConn, encKey)
|
||||
if err != nil {
|
||||
xl.Errorf("wrap work connection: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// check if we need to send proxy protocol info
|
||||
@@ -178,7 +190,6 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
||||
}
|
||||
|
||||
if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
|
||||
// Use the common proxy protocol builder function
|
||||
header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion)
|
||||
connInfo.ProxyProtocolHeader = header
|
||||
}
|
||||
@@ -187,12 +198,18 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
||||
|
||||
if pxy.proxyPlugin != nil {
|
||||
// if plugin is set, let plugin handle connection first
|
||||
// Don't recycle compression resources here because plugins may
|
||||
// retain the connection after Handle returns.
|
||||
xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
|
||||
pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
|
||||
xl.Debugf("handle by plugin finished")
|
||||
return
|
||||
}
|
||||
|
||||
if recycleFn != nil {
|
||||
defer recycleFn()
|
||||
}
|
||||
|
||||
localConn, err := libnet.Dial(
|
||||
net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
|
||||
libnet.WithTimeout(10*time.Second),
|
||||
@@ -209,6 +226,7 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
||||
if connInfo.ProxyProtocolHeader != nil {
|
||||
if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
|
||||
workConn.Close()
|
||||
localConn.Close()
|
||||
xl.Errorf("write proxy protocol header to local conn error: %v", err)
|
||||
return
|
||||
}
|
||||
@@ -219,7 +237,4 @@ func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWor
|
||||
if len(errs) > 0 {
|
||||
xl.Tracef("join connections errors: %v", errs)
|
||||
}
|
||||
if compressionResourceRecycleFn != nil {
|
||||
compressionResourceRecycleFn()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
@@ -25,12 +24,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/errors"
|
||||
libio "github.com/fatedier/golib/io"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
"github.com/fatedier/frp/pkg/proto/udp"
|
||||
"github.com/fatedier/frp/pkg/util/limit"
|
||||
netpkg "github.com/fatedier/frp/pkg/util/net"
|
||||
)
|
||||
|
||||
@@ -83,27 +80,13 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
xl := pxy.xl
|
||||
xl.Infof("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
|
||||
|
||||
var rwc io.ReadWriteCloser = conn
|
||||
var err error
|
||||
if pxy.limiter != nil {
|
||||
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
|
||||
return conn.Close()
|
||||
})
|
||||
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
|
||||
if err != nil {
|
||||
xl.Errorf("wrap work connection: %v", err)
|
||||
return
|
||||
}
|
||||
if pxy.cfg.Transport.UseEncryption {
|
||||
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if pxy.cfg.Transport.UseCompression {
|
||||
rwc = libio.WithCompression(rwc)
|
||||
}
|
||||
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
|
||||
|
||||
workConn := conn
|
||||
workConn := netpkg.WrapReadWriteCloserToConn(remote, conn)
|
||||
readCh := make(chan *msg.UDPPacket, 1024)
|
||||
sendCh := make(chan msg.Message, 1024)
|
||||
isClose := false
|
||||
|
||||
@@ -17,19 +17,16 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/errors"
|
||||
libio "github.com/fatedier/golib/io"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
"github.com/fatedier/frp/pkg/proto/udp"
|
||||
"github.com/fatedier/frp/pkg/util/limit"
|
||||
netpkg "github.com/fatedier/frp/pkg/util/net"
|
||||
)
|
||||
|
||||
@@ -94,28 +91,14 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
// close resources related with old workConn
|
||||
pxy.Close()
|
||||
|
||||
var rwc io.ReadWriteCloser = conn
|
||||
var err error
|
||||
if pxy.limiter != nil {
|
||||
rwc = libio.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
|
||||
return conn.Close()
|
||||
})
|
||||
remote, _, err := pxy.wrapWorkConn(conn, pxy.encryptionKey)
|
||||
if err != nil {
|
||||
xl.Errorf("wrap work connection: %v", err)
|
||||
return
|
||||
}
|
||||
if pxy.cfg.Transport.UseEncryption {
|
||||
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if pxy.cfg.Transport.UseCompression {
|
||||
rwc = libio.WithCompression(rwc)
|
||||
}
|
||||
conn = netpkg.WrapReadWriteCloserToConn(rwc, conn)
|
||||
|
||||
pxy.mu.Lock()
|
||||
pxy.workConn = conn
|
||||
pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn)
|
||||
pxy.readCh = make(chan *msg.UDPPacket, 1024)
|
||||
pxy.sendCh = make(chan msg.Message, 1024)
|
||||
pxy.closed = false
|
||||
@@ -129,7 +112,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
return
|
||||
}
|
||||
if errRet := errors.PanicToError(func() {
|
||||
xl.Tracef("get udp package from workConn: %s", udpMsg.Content)
|
||||
xl.Tracef("get udp package from workConn, len: %d", len(udpMsg.Content))
|
||||
readCh <- &udpMsg
|
||||
}); errRet != nil {
|
||||
xl.Infof("reader goroutine for udp work connection closed: %v", errRet)
|
||||
@@ -145,7 +128,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
for rawMsg := range sendCh {
|
||||
switch m := rawMsg.(type) {
|
||||
case *msg.UDPPacket:
|
||||
xl.Tracef("send udp package to workConn: %s", m.Content)
|
||||
xl.Tracef("send udp package to workConn, len: %d", len(m.Content))
|
||||
case *msg.Ping:
|
||||
xl.Tracef("send ping message to udp workConn")
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
case *msg.UDPPacket:
|
||||
if errRet := errors.PanicToError(func() {
|
||||
sv.readCh <- m
|
||||
xl.Tracef("frpc visitor get udp packet from workConn: %s", m.Content)
|
||||
xl.Tracef("frpc visitor get udp packet from workConn, len: %d", len(m.Content))
|
||||
}); errRet != nil {
|
||||
xl.Infof("reader goroutine for udp work connection closed")
|
||||
return
|
||||
@@ -169,7 +169,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
|
||||
return
|
||||
}
|
||||
xl.Tracef("send udp package to workConn: %s", firstPacket.Content)
|
||||
xl.Tracef("send udp package to workConn, len: %d", len(firstPacket.Content))
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -184,7 +184,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
|
||||
return
|
||||
}
|
||||
xl.Tracef("send udp package to workConn: %s", udpMsg.Content)
|
||||
xl.Tracef("send udp package to workConn, len: %d", len(udpMsg.Content))
|
||||
case <-closeCh:
|
||||
return
|
||||
}
|
||||
@@ -217,6 +217,7 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
|
||||
}
|
||||
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
|
||||
if err != nil {
|
||||
visitorConn.Close()
|
||||
return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err)
|
||||
}
|
||||
|
||||
@@ -224,11 +225,13 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
|
||||
_ = visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
|
||||
if err != nil {
|
||||
visitorConn.Close()
|
||||
return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
|
||||
}
|
||||
_ = visitorConn.SetReadDeadline(time.Time{})
|
||||
|
||||
if newVisitorConnRespMsg.Error != "" {
|
||||
visitorConn.Close()
|
||||
return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
|
||||
}
|
||||
|
||||
@@ -238,6 +241,7 @@ func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
|
||||
remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
|
||||
if err != nil {
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
visitorConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +211,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
|
||||
muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
|
||||
if err != nil {
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
tunnelConn.Close()
|
||||
tunnelErr = err
|
||||
return
|
||||
}
|
||||
@@ -373,6 +374,7 @@ func (ks *KCPTunnelSession) Init(listenConn *net.UDPConn, raddr *net.UDPAddr) er
|
||||
}
|
||||
remote, err := netpkg.NewKCPConnFromUDP(lConn, true, raddr.String())
|
||||
if err != nil {
|
||||
lConn.Close()
|
||||
return fmt.Errorf("create kcp connection from udp connection error: %v", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/go-oidc/v3/oidc"
|
||||
"golang.org/x/oauth2"
|
||||
@@ -205,7 +206,8 @@ type OidcAuthConsumer struct {
|
||||
additionalAuthScopes []v1.AuthScope
|
||||
|
||||
verifier TokenVerifier
|
||||
subjectsFromLogin []string
|
||||
mu sync.RWMutex
|
||||
subjectsFromLogin map[string]struct{}
|
||||
}
|
||||
|
||||
func NewTokenVerifier(cfg v1.AuthOIDCServerConfig) TokenVerifier {
|
||||
@@ -226,7 +228,7 @@ func NewOidcAuthVerifier(additionalAuthScopes []v1.AuthScope, verifier TokenVeri
|
||||
return &OidcAuthConsumer{
|
||||
additionalAuthScopes: additionalAuthScopes,
|
||||
verifier: verifier,
|
||||
subjectsFromLogin: []string{},
|
||||
subjectsFromLogin: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,9 +237,9 @@ func (auth *OidcAuthConsumer) VerifyLogin(loginMsg *msg.Login) (err error) {
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid OIDC token in login: %v", err)
|
||||
}
|
||||
if !slices.Contains(auth.subjectsFromLogin, token.Subject) {
|
||||
auth.subjectsFromLogin = append(auth.subjectsFromLogin, token.Subject)
|
||||
}
|
||||
auth.mu.Lock()
|
||||
auth.subjectsFromLogin[token.Subject] = struct{}{}
|
||||
auth.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -246,11 +248,13 @@ func (auth *OidcAuthConsumer) verifyPostLoginToken(privilegeKey string) (err err
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid OIDC token in ping: %v", err)
|
||||
}
|
||||
if !slices.Contains(auth.subjectsFromLogin, token.Subject) {
|
||||
auth.mu.RLock()
|
||||
_, ok := auth.subjectsFromLogin[token.Subject]
|
||||
auth.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("received different OIDC subject in login and ping. "+
|
||||
"original subjects: %s, "+
|
||||
"new subject: %s",
|
||||
auth.subjectsFromLogin, token.Subject)
|
||||
token.Subject)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -171,15 +171,14 @@ func Convert_ServerCommonConf_To_v1(conf *ServerCommonConf) *v1.ServerConfig {
|
||||
func transformHeadersFromPluginParams(params map[string]string) v1.HeaderOperations {
|
||||
out := v1.HeaderOperations{}
|
||||
for k, v := range params {
|
||||
if !strings.HasPrefix(k, "plugin_header_") {
|
||||
k, ok := strings.CutPrefix(k, "plugin_header_")
|
||||
if !ok || k == "" {
|
||||
continue
|
||||
}
|
||||
if k = strings.TrimPrefix(k, "plugin_header_"); k != "" {
|
||||
if out.Set == nil {
|
||||
out.Set = make(map[string]string)
|
||||
}
|
||||
out.Set[k] = v
|
||||
if out.Set == nil {
|
||||
out.Set = make(map[string]string)
|
||||
}
|
||||
out.Set[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -22,8 +22,8 @@ func GetMapWithoutPrefix(set map[string]string, prefix string) map[string]string
|
||||
m := make(map[string]string)
|
||||
|
||||
for key, value := range set {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
m[strings.TrimPrefix(key, prefix)] = value
|
||||
if trimmed, ok := strings.CutPrefix(key, prefix); ok {
|
||||
m[trimmed] = value
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,9 +15,11 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"maps"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
@@ -97,21 +99,11 @@ func (a *Aggregator) mapsToSortedSlices(
|
||||
proxyMap map[string]v1.ProxyConfigurer,
|
||||
visitorMap map[string]v1.VisitorConfigurer,
|
||||
) ([]v1.ProxyConfigurer, []v1.VisitorConfigurer) {
|
||||
proxies := make([]v1.ProxyConfigurer, 0, len(proxyMap))
|
||||
for _, p := range proxyMap {
|
||||
proxies = append(proxies, p)
|
||||
}
|
||||
sort.Slice(proxies, func(i, j int) bool {
|
||||
return proxies[i].GetBaseConfig().Name < proxies[j].GetBaseConfig().Name
|
||||
proxies := slices.SortedFunc(maps.Values(proxyMap), func(x, y v1.ProxyConfigurer) int {
|
||||
return cmp.Compare(x.GetBaseConfig().Name, y.GetBaseConfig().Name)
|
||||
})
|
||||
|
||||
visitors := make([]v1.VisitorConfigurer, 0, len(visitorMap))
|
||||
for _, v := range visitorMap {
|
||||
visitors = append(visitors, v)
|
||||
}
|
||||
sort.Slice(visitors, func(i, j int) bool {
|
||||
return visitors[i].GetBaseConfig().Name < visitors[j].GetBaseConfig().Name
|
||||
visitors := slices.SortedFunc(maps.Values(visitorMap), func(x, y v1.VisitorConfigurer) int {
|
||||
return cmp.Compare(x.GetBaseConfig().Name, y.GetBaseConfig().Name)
|
||||
})
|
||||
|
||||
return proxies, visitors
|
||||
}
|
||||
|
||||
@@ -196,6 +196,27 @@ func TestAggregator_VisitorMerge(t *testing.T) {
|
||||
require.Len(visitors, 2)
|
||||
}
|
||||
|
||||
func TestAggregator_Load_ReturnsSortedByName(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
agg := newTestAggregator(t, nil)
|
||||
err := agg.ConfigSource().ReplaceAll(
|
||||
[]v1.ProxyConfigurer{mockProxy("charlie"), mockProxy("alice"), mockProxy("bob")},
|
||||
[]v1.VisitorConfigurer{mockVisitor("zulu"), mockVisitor("alpha")},
|
||||
)
|
||||
require.NoError(err)
|
||||
|
||||
proxies, visitors, err := agg.Load()
|
||||
require.NoError(err)
|
||||
require.Len(proxies, 3)
|
||||
require.Equal("alice", proxies[0].GetBaseConfig().Name)
|
||||
require.Equal("bob", proxies[1].GetBaseConfig().Name)
|
||||
require.Equal("charlie", proxies[2].GetBaseConfig().Name)
|
||||
require.Len(visitors, 2)
|
||||
require.Equal("alpha", visitors[0].GetBaseConfig().Name)
|
||||
require.Equal("zulu", visitors[1].GetBaseConfig().Name)
|
||||
}
|
||||
|
||||
func TestAggregator_Load_ReturnsDefensiveCopies(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
|
||||
@@ -70,24 +70,18 @@ func (q *BandwidthQuantity) UnmarshalString(s string) error {
|
||||
f float64
|
||||
err error
|
||||
)
|
||||
switch {
|
||||
case strings.HasSuffix(s, "MB"):
|
||||
if fstr, ok := strings.CutSuffix(s, "MB"); ok {
|
||||
base = MB
|
||||
fstr := strings.TrimSuffix(s, "MB")
|
||||
f, err = strconv.ParseFloat(fstr, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case strings.HasSuffix(s, "KB"):
|
||||
} else if fstr, ok := strings.CutSuffix(s, "KB"); ok {
|
||||
base = KB
|
||||
fstr := strings.TrimSuffix(s, "KB")
|
||||
f, err = strconv.ParseFloat(fstr, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
} else {
|
||||
return errors.New("unit not support")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
q.s = s
|
||||
q.i = int64(f * float64(base))
|
||||
|
||||
@@ -39,6 +39,31 @@ func TestBandwidthQuantity(t *testing.T) {
|
||||
require.Equal(`{"b":"1KB","int":5}`, string(buf))
|
||||
}
|
||||
|
||||
func TestBandwidthQuantity_MB(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
var w Wrap
|
||||
err := json.Unmarshal([]byte(`{"b":"2MB","int":1}`), &w)
|
||||
require.NoError(err)
|
||||
require.EqualValues(2*MB, w.B.Bytes())
|
||||
|
||||
buf, err := json.Marshal(&w)
|
||||
require.NoError(err)
|
||||
require.Equal(`{"b":"2MB","int":1}`, string(buf))
|
||||
}
|
||||
|
||||
func TestBandwidthQuantity_InvalidUnit(t *testing.T) {
|
||||
var w Wrap
|
||||
err := json.Unmarshal([]byte(`{"b":"1GB","int":1}`), &w)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestBandwidthQuantity_InvalidNumber(t *testing.T) {
|
||||
var w Wrap
|
||||
err := json.Unmarshal([]byte(`{"b":"abcKB","int":1}`), &w)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestPortsRangeSlice2String(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ type Pong struct {
|
||||
}
|
||||
|
||||
type UDPPacket struct {
|
||||
Content string `json:"c,omitempty"`
|
||||
Content []byte `json:"c,omitempty"`
|
||||
LocalAddr *net.UDPAddr `json:"l,omitempty"`
|
||||
RemoteAddr *net.UDPAddr `json:"r,omitempty"`
|
||||
}
|
||||
|
||||
@@ -16,9 +16,8 @@ func StripUserPrefix(user, name string) string {
|
||||
if user == "" {
|
||||
return name
|
||||
}
|
||||
prefix := user + "."
|
||||
if strings.HasPrefix(name, prefix) {
|
||||
return strings.TrimPrefix(name, prefix)
|
||||
if trimmed, ok := strings.CutPrefix(name, user+"."); ok {
|
||||
return trimmed
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
@@ -70,12 +70,8 @@ func ClassifyNATFeature(addresses []string, localIPs []string) (*NatFeature, err
|
||||
continue
|
||||
}
|
||||
|
||||
if portNum > portMax {
|
||||
portMax = portNum
|
||||
}
|
||||
if portNum < portMin {
|
||||
portMin = portNum
|
||||
}
|
||||
portMax = max(portMax, portNum)
|
||||
portMin = min(portMin, portNum)
|
||||
if baseIP != ip {
|
||||
ipChanged = true
|
||||
}
|
||||
|
||||
@@ -298,11 +298,13 @@ func waitDetectMessage(
|
||||
n, raddr, err := conn.ReadFromUDP(buf)
|
||||
_ = conn.SetReadDeadline(time.Time{})
|
||||
if err != nil {
|
||||
pool.PutBuf(buf)
|
||||
return nil, err
|
||||
}
|
||||
xl.Debugf("get udp message local %s, from %s", conn.LocalAddr(), raddr)
|
||||
var m msg.NatHoleSid
|
||||
if err := DecodeMessageInto(buf[:n], key, &m); err != nil {
|
||||
pool.PutBuf(buf)
|
||||
xl.Warnf("decode sid message error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
stdlog "log"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/pool"
|
||||
|
||||
@@ -68,7 +69,7 @@ func NewHTTP2HTTPPlugin(_ PluginContext, options v1.ClientPluginOptions) (Plugin
|
||||
|
||||
p.s = &http.Server{
|
||||
Handler: rp,
|
||||
ReadHeaderTimeout: 0,
|
||||
ReadHeaderTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
stdlog "log"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/pool"
|
||||
|
||||
@@ -77,7 +78,7 @@ func NewHTTP2HTTPSPlugin(_ PluginContext, options v1.ClientPluginOptions) (Plugi
|
||||
|
||||
p.s = &http.Server{
|
||||
Handler: rp,
|
||||
ReadHeaderTimeout: 0,
|
||||
ReadHeaderTimeout: 60 * time.Second,
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -62,11 +62,13 @@ func (p *TLS2RawPlugin) Handle(ctx context.Context, connInfo *ConnectionInfo) {
|
||||
|
||||
if err := tlsConn.Handshake(); err != nil {
|
||||
xl.Warnf("tls handshake error: %v", err)
|
||||
tlsConn.Close()
|
||||
return
|
||||
}
|
||||
rawConn, err := net.Dial("tcp", p.opts.LocalAddr)
|
||||
if err != nil {
|
||||
xl.Warnf("dial to local addr error: %v", err)
|
||||
tlsConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -54,10 +54,13 @@ func (uds *UnixDomainSocketPlugin) Handle(ctx context.Context, connInfo *Connect
|
||||
localConn, err := net.DialUnix("unix", nil, uds.UnixAddr)
|
||||
if err != nil {
|
||||
xl.Warnf("dial to uds %s error: %v", uds.UnixAddr, err)
|
||||
connInfo.Conn.Close()
|
||||
return
|
||||
}
|
||||
if connInfo.ProxyProtocolHeader != nil {
|
||||
if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
|
||||
localConn.Close()
|
||||
connInfo.Conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
@@ -64,12 +65,7 @@ func (p *httpPlugin) Name() string {
|
||||
}
|
||||
|
||||
func (p *httpPlugin) IsSupport(op string) bool {
|
||||
for _, v := range p.options.Ops {
|
||||
if v == op {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return slices.Contains(p.options.Ops, op)
|
||||
}
|
||||
|
||||
func (p *httpPlugin) Handle(ctx context.Context, op string, content any) (*Response, any, error) {
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
package udp
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -28,16 +27,17 @@ import (
|
||||
)
|
||||
|
||||
func NewUDPPacket(buf []byte, laddr, raddr *net.UDPAddr) *msg.UDPPacket {
|
||||
content := make([]byte, len(buf))
|
||||
copy(content, buf)
|
||||
return &msg.UDPPacket{
|
||||
Content: base64.StdEncoding.EncodeToString(buf),
|
||||
Content: content,
|
||||
LocalAddr: laddr,
|
||||
RemoteAddr: raddr,
|
||||
}
|
||||
}
|
||||
|
||||
func GetContent(m *msg.UDPPacket) (buf []byte, err error) {
|
||||
buf, err = base64.StdEncoding.DecodeString(m.Content)
|
||||
return
|
||||
return m.Content, nil
|
||||
}
|
||||
|
||||
func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh chan<- *msg.UDPPacket, bufSize int) {
|
||||
@@ -60,7 +60,7 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UDPPacket, sendCh
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// buf[:n] will be encoded to string, so the bytes can be reused
|
||||
// NewUDPPacket copies buf[:n], so the read buffer can be reused
|
||||
udpMsg := NewUDPPacket(buf[:n], nil, remoteAddr)
|
||||
|
||||
select {
|
||||
@@ -85,6 +85,7 @@ func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UDPPacket, sendCh chan<-
|
||||
}()
|
||||
|
||||
buf := pool.GetBuf(bufSize)
|
||||
defer pool.PutBuf(buf)
|
||||
for {
|
||||
_ = udpConn.SetReadDeadline(time.Now().Add(30 * time.Second))
|
||||
n, _, err := udpConn.ReadFromUDP(buf)
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"time"
|
||||
@@ -85,7 +86,9 @@ func newCertPool(caPath string) (*x509.CertPool, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pool.AppendCertsFromPEM(caCrt)
|
||||
if !pool.AppendCertsFromPEM(caCrt) {
|
||||
return nil, fmt.Errorf("failed to parse CA certificate from file %q: no valid PEM certificates found", caPath)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ type WebsocketListener struct {
|
||||
// ln: tcp listener for websocket connections
|
||||
func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
|
||||
wl = &WebsocketListener{
|
||||
ln: ln,
|
||||
acceptCh: make(chan net.Conn),
|
||||
}
|
||||
|
||||
|
||||
@@ -63,11 +63,12 @@ func (l *Logger) AddPrefix(prefix LogPrefix) *Logger {
|
||||
if prefix.Priority <= 0 {
|
||||
prefix.Priority = 10
|
||||
}
|
||||
for _, p := range l.prefixes {
|
||||
for i, p := range l.prefixes {
|
||||
if p.Name == prefix.Name {
|
||||
found = true
|
||||
p.Value = prefix.Value
|
||||
p.Priority = prefix.Priority
|
||||
l.prefixes[i].Value = prefix.Value
|
||||
l.prefixes[i].Priority = prefix.Priority
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
|
||||
@@ -100,8 +100,9 @@ func (tg *TCPGroup) Listen(proxyName string, group string, groupKey string, addr
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tcpLn, errRet := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(port)))
|
||||
tcpLn, errRet := net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(realPort)))
|
||||
if errRet != nil {
|
||||
tg.ctl.portManager.Release(realPort)
|
||||
err = errRet
|
||||
return
|
||||
}
|
||||
|
||||
@@ -75,16 +75,21 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range pxy.cfg.CustomDomains {
|
||||
if domain == "" {
|
||||
continue
|
||||
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
|
||||
for _, d := range pxy.cfg.CustomDomains {
|
||||
if d != "" {
|
||||
domains = append(domains, d)
|
||||
}
|
||||
}
|
||||
if pxy.cfg.SubDomain != "" {
|
||||
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
|
||||
}
|
||||
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range domains {
|
||||
routeConfig.Domain = domain
|
||||
for _, location := range locations {
|
||||
routeConfig.Location = location
|
||||
|
||||
tmpRouteConfig := routeConfig
|
||||
|
||||
// handle group
|
||||
@@ -93,12 +98,10 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
pxy.closeFuncs = append(pxy.closeFuncs, func() {
|
||||
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.LoadBalancer.Group, tmpRouteConfig)
|
||||
})
|
||||
} else {
|
||||
// no group
|
||||
err = pxy.rc.HTTPReverseProxy.Register(routeConfig)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -112,39 +115,6 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
|
||||
routeConfig.Domain, routeConfig.Location, pxy.cfg.LoadBalancer.Group, pxy.cfg.RouteByHTTPUser)
|
||||
}
|
||||
}
|
||||
|
||||
if pxy.cfg.SubDomain != "" {
|
||||
routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
|
||||
for _, location := range locations {
|
||||
routeConfig.Location = location
|
||||
|
||||
tmpRouteConfig := routeConfig
|
||||
|
||||
// handle group
|
||||
if pxy.cfg.LoadBalancer.Group != "" {
|
||||
err = pxy.rc.HTTPGroupCtl.Register(pxy.name, pxy.cfg.LoadBalancer.Group, pxy.cfg.LoadBalancer.GroupKey, routeConfig)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
pxy.closeFuncs = append(pxy.closeFuncs, func() {
|
||||
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.LoadBalancer.Group, tmpRouteConfig)
|
||||
})
|
||||
} else {
|
||||
err = pxy.rc.HTTPReverseProxy.Register(routeConfig)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
pxy.closeFuncs = append(pxy.closeFuncs, func() {
|
||||
pxy.rc.HTTPReverseProxy.UnRegister(tmpRouteConfig)
|
||||
})
|
||||
}
|
||||
addrs = append(addrs, util.CanonicalAddr(tmpRouteConfig.Domain, pxy.serverCfg.VhostHTTPPort))
|
||||
|
||||
xl.Infof("http proxy listen for host [%s] location [%s] group [%s], routeByHTTPUser [%s]",
|
||||
routeConfig.Domain, routeConfig.Location, pxy.cfg.LoadBalancer.Group, pxy.cfg.RouteByHTTPUser)
|
||||
}
|
||||
}
|
||||
remoteAddr = strings.Join(addrs, ",")
|
||||
return
|
||||
}
|
||||
@@ -168,6 +138,7 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
|
||||
rwc, err = libio.WithEncryption(rwc, pxy.encryptionKey)
|
||||
if err != nil {
|
||||
xl.Errorf("create encryption stream error: %v", err)
|
||||
tmpConn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,23 +53,18 @@ func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) {
|
||||
pxy.Close()
|
||||
}
|
||||
}()
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range pxy.cfg.CustomDomains {
|
||||
if domain == "" {
|
||||
continue
|
||||
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
|
||||
for _, d := range pxy.cfg.CustomDomains {
|
||||
if d != "" {
|
||||
domains = append(domains, d)
|
||||
}
|
||||
|
||||
l, err := pxy.listenForDomain(routeConfig, domain)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
pxy.listeners = append(pxy.listeners, l)
|
||||
addrs = append(addrs, util.CanonicalAddr(domain, pxy.serverCfg.VhostHTTPSPort))
|
||||
xl.Infof("https proxy listen for host [%s] group [%s]", domain, pxy.cfg.LoadBalancer.Group)
|
||||
}
|
||||
if pxy.cfg.SubDomain != "" {
|
||||
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
|
||||
}
|
||||
|
||||
if pxy.cfg.SubDomain != "" {
|
||||
domain := pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range domains {
|
||||
l, err := pxy.listenForDomain(routeConfig, domain)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
@@ -72,21 +72,19 @@ func (pxy *TCPMuxProxy) httpConnectListen(
|
||||
}
|
||||
|
||||
func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) {
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range pxy.cfg.CustomDomains {
|
||||
if domain == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
addrs, err = pxy.httpConnectListen(domain, pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
|
||||
for _, d := range pxy.cfg.CustomDomains {
|
||||
if d != "" {
|
||||
domains = append(domains, d)
|
||||
}
|
||||
}
|
||||
|
||||
if pxy.cfg.SubDomain != "" {
|
||||
addrs, err = pxy.httpConnectListen(pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost,
|
||||
pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
|
||||
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
|
||||
}
|
||||
|
||||
addrs := make([]string, 0)
|
||||
for _, domain := range domains {
|
||||
addrs, err = pxy.httpConnectListen(domain, pxy.cfg.RouteByHTTPUser, pxy.cfg.HTTPUser, pxy.cfg.HTTPPassword, addrs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
continue
|
||||
case *msg.UDPPacket:
|
||||
if errRet := errors.PanicToError(func() {
|
||||
xl.Tracef("get udp message from workConn: %s", m.Content)
|
||||
xl.Tracef("get udp message from workConn, len: %d", len(m.Content))
|
||||
pxy.readCh <- m
|
||||
metrics.Server.AddTrafficOut(
|
||||
pxy.GetName(),
|
||||
@@ -167,7 +167,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
xl.Tracef("send message to udp workConn: %s", udpMsg.Content)
|
||||
xl.Tracef("send message to udp workConn, len: %d", len(udpMsg.Content))
|
||||
metrics.Server.AddTrafficIn(
|
||||
pxy.GetName(),
|
||||
pxy.GetConfigurer().GetBaseConfig().Type,
|
||||
|
||||
Reference in New Issue
Block a user