feat: support sudp proxy (#1730)

This commit is contained in:
Tank
2020-04-22 21:37:45 +08:00
committed by GitHub
parent 6d78af6144
commit 4797136965
15 changed files with 669 additions and 1 deletions

View File

@@ -26,10 +26,12 @@ import (
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/udp"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/xlog"
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
"github.com/fatedier/golib/pool"
fmux "github.com/hashicorp/yamux"
@@ -58,6 +60,12 @@ func NewVisitor(ctx context.Context, ctl *Control, cfg config.VisitorConf) (visi
BaseVisitor: &baseVisitor,
cfg: cfg,
}
case *config.SudpVisitorConf:
visitor = &SudpVisitor{
BaseVisitor: &baseVisitor,
cfg: cfg,
checkCloseCh: make(chan struct{}),
}
}
return
}
@@ -328,3 +336,204 @@ func (sv *XtcpVisitor) handleConn(userConn net.Conn) {
frpIo.Join(userConn, muxConnRWCloser)
xl.Debug("join connections closed")
}
type SudpVisitor struct {
*BaseVisitor
checkCloseCh chan struct{}
// udpConn is the listener of udp packet
udpConn *net.UDPConn
readCh chan *msg.UdpPacket
sendCh chan *msg.UdpPacket
cfg *config.SudpVisitorConf
}
// SUDP Run start listen a udp port
func (sv *SudpVisitor) Run() (err error) {
xl := xlog.FromContextSafe(sv.ctx)
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", sv.cfg.BindAddr, sv.cfg.BindPort))
if err != nil {
return fmt.Errorf("sudp ResolveUDPAddr error: %v", err)
}
sv.udpConn, err = net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("listen udp port %s error: %v", addr.String(), err)
}
sv.sendCh = make(chan *msg.UdpPacket, 1024)
sv.readCh = make(chan *msg.UdpPacket, 1024)
xl.Info("sudp start to work")
go sv.dispatcher()
go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh)
return
}
func (sv *SudpVisitor) dispatcher() {
xl := xlog.FromContextSafe(sv.ctx)
for {
// loop for get frpc to frps tcp conn
// setup worker
// wait worker to finished
// retry or exit
visitorConn, err := sv.getNewVisitorConn()
if err != nil {
// check if proxy is closed
// if checkCloseCh is close, we will return, other case we will continue to reconnect
select {
case <-sv.checkCloseCh:
xl.Info("frpc sudp visitor proxy is closed")
return
default:
}
time.Sleep(3 * time.Second)
xl.Warn("newVisitorConn to frps error: %v, try to reconnect", err)
continue
}
sv.worker(visitorConn)
select {
case <-sv.checkCloseCh:
return
default:
}
}
}
func (sv *SudpVisitor) worker(workConn net.Conn) {
xl := xlog.FromContextSafe(sv.ctx)
xl.Debug("starting sudp proxy worker")
wg := &sync.WaitGroup{}
wg.Add(2)
closeCh := make(chan struct{})
// udp service -> frpc -> frps -> frpc visitor -> user
workConnReaderFn := func(conn net.Conn) {
defer func() {
conn.Close()
close(closeCh)
wg.Done()
}()
for {
var (
rawMsg msg.Message
errRet error
)
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
xl.Warn("read from workconn for user udp conn error: %v", errRet)
return
}
conn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
xl.Debug("frpc visitor get ping message from frpc")
continue
case *msg.UdpPacket:
if errRet := errors.PanicToError(func() {
sv.readCh <- m
xl.Trace("frpc visitor get udp packet from frpc")
}); errRet != nil {
xl.Info("reader goroutine for udp work connection closed")
return
}
}
}
}
// udp service <- frpc <- frps <- frpc visitor <- user
workConnSenderFn := func(conn net.Conn) {
defer func() {
conn.Close()
wg.Done()
}()
var errRet error
for {
select {
case udpMsg, ok := <-sv.sendCh:
if !ok {
xl.Info("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
xl.Warn("sender goroutine for udp work connection closed: %v", errRet)
return
}
case <-closeCh:
return
}
}
}
go workConnReaderFn(workConn)
go workConnSenderFn(workConn)
wg.Wait()
xl.Info("sudp worker is closed")
}
func (sv *SudpVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) {
visitorConn, err = sv.ctl.connectServer()
if err != nil {
return nil, fmt.Errorf("frpc connect frps error: %v", err)
}
now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
UseEncryption: sv.cfg.UseEncryption,
UseCompression: sv.cfg.UseCompression,
}
err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
if err != nil {
return nil, fmt.Errorf("frpc send newVisitorConnMsg to frps error: %v", err)
}
var newVisitorConnRespMsg msg.NewVisitorConnResp
visitorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
if err != nil {
return nil, fmt.Errorf("frpc read newVisitorConnRespMsg error: %v", err)
}
visitorConn.SetReadDeadline(time.Time{})
if newVisitorConnRespMsg.Error != "" {
return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
}
return
}
func (sv *SudpVisitor) Close() {
sv.mu.Lock()
defer sv.mu.Unlock()
select {
case <-sv.checkCloseCh:
return
default:
close(sv.checkCloseCh)
}
if sv.udpConn != nil {
sv.udpConn.Close()
}
close(sv.readCh)
close(sv.sendCh)
}