Compare commits

...

2 Commits

Author SHA1 Message Date
fatedier
c70ceff370 fix: three high-severity bugs across nathole, proxy, and udp modules (#5214)
- pkg/nathole: add RLock when reading clientCfgs map in PreCheck path
  to prevent concurrent map read/write crash
- server/proxy: fix error variable shadowing in GetWorkConnFromPool
  that could return a closed connection with nil error
- pkg/util/net: check ListenUDP error before spawning goroutines
  and assign readConn to struct field so Close() works correctly
2026-03-07 13:36:02 +08:00
fatedier
bb3d0e7140 deduplicate common logic across proxy, visitor, and metrics modules (#5213)
- Replace duplicate parseBasicAuth with existing httppkg.ParseBasicAuth
- Extract buildDomains helper in BaseProxy for HTTP/HTTPS/TCPMux proxies
- Extract toProxyStats helper to deduplicate ProxyStats construction
- Extract startVisitorListener helper in BaseProxy for STCP/SUDP proxies
- Extract acceptLoop helper in BaseVisitor for STCP/XTCP visitors
2026-03-07 12:00:27 +08:00
13 changed files with 82 additions and 178 deletions

View File

@@ -42,10 +42,10 @@ func (sv *STCPVisitor) Run() (err error) {
if err != nil {
return
}
go sv.worker()
go sv.acceptLoop(sv.l, "stcp local", sv.handleConn)
}
go sv.internalConnWorker()
go sv.acceptLoop(sv.internalLn, "stcp internal", sv.handleConn)
if sv.plugin != nil {
sv.plugin.Start()
@@ -57,30 +57,6 @@ func (sv *STCPVisitor) Close() {
sv.BaseVisitor.Close()
}
func (sv *STCPVisitor) worker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.l.Accept()
if err != nil {
xl.Warnf("stcp local listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *STCPVisitor) internalConnWorker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.internalLn.Accept()
if err != nil {
xl.Warnf("stcp internal listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *STCPVisitor) handleConn(userConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
var tunnelErr error

View File

@@ -119,6 +119,18 @@ func (v *BaseVisitor) AcceptConn(conn net.Conn) error {
return v.internalLn.PutConn(conn)
}
func (v *BaseVisitor) acceptLoop(l net.Listener, name string, handleConn func(net.Conn)) {
xl := xlog.FromContextSafe(v.ctx)
for {
conn, err := l.Accept()
if err != nil {
xl.Warnf("%s listener closed", name)
return
}
go handleConn(conn)
}
}
func (v *BaseVisitor) Close() {
if v.l != nil {
v.l.Close()

View File

@@ -65,10 +65,10 @@ func (sv *XTCPVisitor) Run() (err error) {
if err != nil {
return
}
go sv.worker()
go sv.acceptLoop(sv.l, "xtcp local", sv.handleConn)
}
go sv.internalConnWorker()
go sv.acceptLoop(sv.internalLn, "xtcp internal", sv.handleConn)
go sv.processTunnelStartEvents()
if sv.cfg.KeepTunnelOpen {
sv.retryLimiter = rate.NewLimiter(rate.Every(time.Hour/time.Duration(sv.cfg.MaxRetriesAnHour)), sv.cfg.MaxRetriesAnHour)
@@ -93,30 +93,6 @@ func (sv *XTCPVisitor) Close() {
}
}
func (sv *XTCPVisitor) worker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.l.Accept()
if err != nil {
xl.Warnf("xtcp local listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *XTCPVisitor) internalConnWorker() {
xl := xlog.FromContextSafe(sv.ctx)
for {
conn, err := sv.internalLn.Accept()
if err != nil {
xl.Warnf("xtcp internal listener closed")
return
}
go sv.handleConn(conn)
}
}
func (sv *XTCPVisitor) processTunnelStartEvents() {
for {
select {

View File

@@ -203,6 +203,25 @@ func (m *serverMetrics) GetServer() *ServerStats {
return s
}
func toProxyStats(name string, proxyStats *ProxyStatistics) *ProxyStats {
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
User: proxyStats.User,
ClientID: proxyStats.ClientID,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: int64(proxyStats.CurConns.Count()),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
return ps
}
func (m *serverMetrics) GetProxiesByType(proxyType string) []*ProxyStats {
res := make([]*ProxyStats, 0)
m.mu.Lock()
@@ -212,23 +231,7 @@ func (m *serverMetrics) GetProxiesByType(proxyType string) []*ProxyStats {
if proxyStats.ProxyType != proxyType {
continue
}
ps := &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
User: proxyStats.User,
ClientID: proxyStats.ClientID,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: int64(proxyStats.CurConns.Count()),
}
if !proxyStats.LastStartTime.IsZero() {
ps.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
ps.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = append(res, ps)
res = append(res, toProxyStats(name, proxyStats))
}
return res
}
@@ -241,26 +244,10 @@ func (m *serverMetrics) GetProxiesByTypeAndName(proxyType string, proxyName stri
if proxyStats.ProxyType != proxyType {
continue
}
if name != proxyName {
continue
}
res = &ProxyStats{
Name: name,
Type: proxyStats.ProxyType,
User: proxyStats.User,
ClientID: proxyStats.ClientID,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: int64(proxyStats.CurConns.Count()),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = toProxyStats(name, proxyStats)
break
}
return
@@ -272,21 +259,7 @@ func (m *serverMetrics) GetProxyByName(proxyName string) (res *ProxyStats) {
proxyStats, ok := m.info.ProxyStatistics[proxyName]
if ok {
res = &ProxyStats{
Name: proxyName,
Type: proxyStats.ProxyType,
User: proxyStats.User,
ClientID: proxyStats.ClientID,
TodayTrafficIn: proxyStats.TrafficIn.TodayCount(),
TodayTrafficOut: proxyStats.TrafficOut.TodayCount(),
CurConns: int64(proxyStats.CurConns.Count()),
}
if !proxyStats.LastStartTime.IsZero() {
res.LastStartTime = proxyStats.LastStartTime.Format("01-02 15:04:05")
}
if !proxyStats.LastCloseTime.IsZero() {
res.LastCloseTime = proxyStats.LastCloseTime.Format("01-02 15:04:05")
}
res = toProxyStats(proxyName, proxyStats)
}
return
}

View File

@@ -152,7 +152,9 @@ func (c *Controller) GenSid() string {
func (c *Controller) HandleVisitor(m *msg.NatHoleVisitor, transporter transport.MessageTransporter, visitorUser string) {
if m.PreCheck {
c.mu.RLock()
cfg, ok := c.clientCfgs[m.ProxyName]
c.mu.RUnlock()
if !ok {
_ = transporter.Send(c.GenNatHoleResponse(m.TransactionID, nil, fmt.Sprintf("xtcp server for [%s] doesn't exist", m.ProxyName)))
return

View File

@@ -168,11 +168,15 @@ func ListenUDP(bindAddr string, bindPort int) (l *UDPListener, err error) {
return l, err
}
readConn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return l, err
}
l = &UDPListener{
addr: udpAddr,
acceptCh: make(chan net.Conn),
writeCh: make(chan *UDPPacket, 1000),
readConn: readConn,
fakeConns: make(map[string]*FakeUDPConn),
}

View File

@@ -266,31 +266,13 @@ func (rp *HTTPReverseProxy) connectHandler(rw http.ResponseWriter, req *http.Req
go libio.Join(remote, client)
}
func parseBasicAuth(auth string) (username, password string, ok bool) {
const prefix = "Basic "
// Case insensitive prefix match. See Issue 22736.
if len(auth) < len(prefix) || !strings.EqualFold(auth[:len(prefix)], prefix) {
return
}
c, err := base64.StdEncoding.DecodeString(auth[len(prefix):])
if err != nil {
return
}
cs := string(c)
s := strings.IndexByte(cs, ':')
if s < 0 {
return
}
return cs[:s], cs[s+1:], true
}
func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Request {
user := ""
// If url host isn't empty, it's a proxy request. Get http user from Proxy-Authorization header.
if req.URL.Host != "" {
proxyAuth := req.Header.Get("Proxy-Authorization")
if proxyAuth != "" {
user, _, _ = parseBasicAuth(proxyAuth)
user, _, _ = httppkg.ParseBasicAuth(proxyAuth)
}
}
if user == "" {

View File

@@ -75,15 +75,7 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
}
}()
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
}
if pxy.cfg.SubDomain != "" {
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
domains := pxy.buildDomains(pxy.cfg.CustomDomains, pxy.cfg.SubDomain)
addrs := make([]string, 0)
for _, domain := range domains {

View File

@@ -53,15 +53,7 @@ func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) {
pxy.Close()
}
}()
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
}
if pxy.cfg.SubDomain != "" {
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
domains := pxy.buildDomains(pxy.cfg.CustomDomains, pxy.cfg.SubDomain)
addrs := make([]string, 0)
for _, domain := range domains {

View File

@@ -150,7 +150,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
dstPort, _ = strconv.ParseUint(dstPortStr, 10, 16)
}
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
err = msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
SrcAddr: srcAddr,
SrcPort: uint16(srcPort),
@@ -161,6 +161,7 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
if err != nil {
xl.Warnf("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
workConn = nil
} else {
break
}
@@ -173,6 +174,36 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
return
}
// startVisitorListener sets up a VisitorManager listener for visitor-based proxies (STCP, SUDP).
func (pxy *BaseProxy) startVisitorListener(secretKey string, allowUsers []string, proxyType string) error {
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
listener, err := pxy.rc.VisitorManager.Listen(pxy.GetName(), secretKey, allowUsers)
if err != nil {
return err
}
pxy.listeners = append(pxy.listeners, listener)
pxy.xl.Infof("%s proxy custom listen success", proxyType)
pxy.startCommonTCPListenersHandler()
return nil
}
// buildDomains constructs a list of domains from custom domains and subdomain configuration.
func (pxy *BaseProxy) buildDomains(customDomains []string, subDomain string) []string {
domains := make([]string, 0, len(customDomains)+1)
for _, d := range customDomains {
if d != "" {
domains = append(domains, d)
}
}
if subDomain != "" {
domains = append(domains, subDomain+"."+pxy.serverCfg.SubDomainHost)
}
return domains
}
// startCommonTCPListenersHandler start a goroutine handler for each listener.
func (pxy *BaseProxy) startCommonTCPListenersHandler() {
xl := xlog.FromContextSafe(pxy.ctx)

View File

@@ -41,21 +41,7 @@ func NewSTCPProxy(baseProxy *BaseProxy) Proxy {
}
func (pxy *STCPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
allowUsers := pxy.cfg.AllowUsers
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Secretkey, allowUsers)
if errRet != nil {
err = errRet
return
}
pxy.listeners = append(pxy.listeners, listener)
xl.Infof("stcp proxy custom listen success")
pxy.startCommonTCPListenersHandler()
err = pxy.startVisitorListener(pxy.cfg.Secretkey, pxy.cfg.AllowUsers, "stcp")
return
}

View File

@@ -41,21 +41,7 @@ func NewSUDPProxy(baseProxy *BaseProxy) Proxy {
}
func (pxy *SUDPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
allowUsers := pxy.cfg.AllowUsers
// if allowUsers is empty, only allow same user from proxy
if len(allowUsers) == 0 {
allowUsers = []string{pxy.GetUserInfo().User}
}
listener, errRet := pxy.rc.VisitorManager.Listen(pxy.GetName(), pxy.cfg.Secretkey, allowUsers)
if errRet != nil {
err = errRet
return
}
pxy.listeners = append(pxy.listeners, listener)
xl.Infof("sudp proxy custom listen success")
pxy.startCommonTCPListenersHandler()
err = pxy.startVisitorListener(pxy.cfg.Secretkey, pxy.cfg.AllowUsers, "sudp")
return
}

View File

@@ -72,15 +72,7 @@ func (pxy *TCPMuxProxy) httpConnectListen(
}
func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) {
domains := make([]string, 0, len(pxy.cfg.CustomDomains)+1)
for _, d := range pxy.cfg.CustomDomains {
if d != "" {
domains = append(domains, d)
}
}
if pxy.cfg.SubDomain != "" {
domains = append(domains, pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost)
}
domains := pxy.buildDomains(pxy.cfg.CustomDomains, pxy.cfg.SubDomain)
addrs := make([]string, 0)
for _, domain := range domains {