Files
frp/client/visitor/sudp.go
fatedier f948c595f4 client/visitor: deduplicate visitor connection handshake and wrapping
Extract two shared helpers to eliminate duplicated code across STCP,
SUDP, and XTCP visitors:

- dialRawVisitorConn: handles ConnectServer + NewVisitorConn handshake
  (auth, sign key, 10s read deadline, error check)
- wrapVisitorConn: handles encryption + pooled compression wrapping,
  returning a recycleFn for pool resource cleanup

SUDP is upgraded from WithCompression to WithCompressionFromPool,
aligning with the pooled compression used by STCP and XTCP.
2026-03-08 00:58:30 +08:00

231 lines
5.3 KiB
Go

// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package visitor
import (
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/fatedier/golib/errors"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
)
type SUDPVisitor struct {
*BaseVisitor
checkCloseCh chan struct{}
// udpConn is the listener of udp packet
udpConn *net.UDPConn
readCh chan *msg.UDPPacket
sendCh chan *msg.UDPPacket
cfg *v1.SUDPVisitorConfig
}
// SUDP Run start listen a udp port
func (sv *SUDPVisitor) Run() (err error) {
xl := xlog.FromContextSafe(sv.ctx)
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(sv.cfg.BindAddr, strconv.Itoa(sv.cfg.BindPort)))
if err != nil {
return fmt.Errorf("sudp ResolveUDPAddr error: %v", err)
}
sv.udpConn, err = net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("listen udp port %s error: %v", addr.String(), err)
}
sv.sendCh = make(chan *msg.UDPPacket, 1024)
sv.readCh = make(chan *msg.UDPPacket, 1024)
xl.Infof("sudp start to work, listen on %s", addr)
go sv.dispatcher()
go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh, int(sv.clientCfg.UDPPacketSize))
return
}
func (sv *SUDPVisitor) dispatcher() {
xl := xlog.FromContextSafe(sv.ctx)
var (
visitorConn net.Conn
recycleFn func()
err error
firstPacket *msg.UDPPacket
)
for {
select {
case firstPacket = <-sv.sendCh:
if firstPacket == nil {
xl.Infof("frpc sudp visitor proxy is closed")
return
}
case <-sv.checkCloseCh:
xl.Infof("frpc sudp visitor proxy is closed")
return
}
visitorConn, recycleFn, err = sv.getNewVisitorConn()
if err != nil {
xl.Warnf("newVisitorConn to frps error: %v, try to reconnect", err)
continue
}
// visitorConn always be closed when worker done.
func() {
defer recycleFn()
sv.worker(visitorConn, firstPacket)
}()
select {
case <-sv.checkCloseCh:
return
default:
}
}
}
func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
xl := xlog.FromContextSafe(sv.ctx)
xl.Debugf("starting sudp proxy worker")
wg := &sync.WaitGroup{}
wg.Add(2)
closeCh := make(chan struct{})
// udp service -> frpc -> frps -> frpc visitor -> user
workConnReaderFn := func(conn net.Conn) {
defer func() {
conn.Close()
close(closeCh)
wg.Done()
}()
for {
var (
rawMsg msg.Message
errRet error
)
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
_ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
xl.Warnf("read from workconn for user udp conn error: %v", errRet)
return
}
_ = conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
xl.Debugf("frpc visitor get ping message from frpc")
continue
case *msg.UDPPacket:
if errRet := errors.PanicToError(func() {
sv.readCh <- m
xl.Tracef("frpc visitor get udp packet from workConn, len: %d", len(m.Content))
}); errRet != nil {
xl.Infof("reader goroutine for udp work connection closed")
return
}
}
}
}
// udp service <- frpc <- frps <- frpc visitor <- user
workConnSenderFn := func(conn net.Conn) {
defer func() {
conn.Close()
wg.Done()
}()
var errRet error
if firstPacket != nil {
if errRet = msg.WriteMsg(conn, firstPacket); errRet != nil {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
xl.Tracef("send udp package to workConn, len: %d", len(firstPacket.Content))
}
for {
select {
case udpMsg, ok := <-sv.sendCh:
if !ok {
xl.Infof("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
xl.Tracef("send udp package to workConn, len: %d", len(udpMsg.Content))
case <-closeCh:
return
}
}
}
go workConnReaderFn(workConn)
go workConnSenderFn(workConn)
wg.Wait()
xl.Infof("sudp worker is closed")
}
func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, func(), error) {
rawConn, err := sv.dialRawVisitorConn(sv.cfg.GetBaseConfig())
if err != nil {
return nil, func() {}, err
}
rwc, recycleFn, err := wrapVisitorConn(rawConn, sv.cfg.GetBaseConfig())
if err != nil {
rawConn.Close()
return nil, func() {}, err
}
return netpkg.WrapReadWriteCloserToConn(rwc, rawConn), recycleFn, nil
}
func (sv *SUDPVisitor) Close() {
sv.mu.Lock()
defer sv.mu.Unlock()
select {
case <-sv.checkCloseCh:
return
default:
close(sv.checkCloseCh)
}
sv.BaseVisitor.Close()
if sv.udpConn != nil {
sv.udpConn.Close()
}
close(sv.readCh)
close(sv.sendCh)
}