support udp type

This commit is contained in:
fatedier
2016-12-19 01:22:21 +08:00
parent adcb2c1ea5
commit f2999e3317
18 changed files with 556 additions and 76 deletions

View File

@@ -16,6 +16,7 @@ package server
import (
"fmt"
"net"
"sync"
"time"
@@ -25,6 +26,7 @@ import (
"github.com/fatedier/frp/src/models/msg"
"github.com/fatedier/frp/src/utils/conn"
"github.com/fatedier/frp/src/utils/log"
"github.com/fatedier/frp/src/utils/pool"
)
type Listener interface {
@@ -38,13 +40,17 @@ type ProxyServer struct {
ListenPort int64
CustomDomains []string
Status int64
CtlConn *conn.Conn // control connection with frpc
listeners []Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
workConnChan chan *conn.Conn // get new work conns from control goroutine
mutex sync.RWMutex
closeChan chan struct{} // for notify other goroutines that the proxy is closed by close this channel
Status int64
CtlConn *conn.Conn // control connection with frpc
WorkConnUdp *conn.Conn // work connection for udp
udpConn *net.UDPConn
listeners []Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
workConnChan chan *conn.Conn // get new work conns from control goroutine
udpSenderChan chan *msg.UdpPacket
mutex sync.RWMutex
closeChan chan struct{} // close this channel for notifying other goroutines that the proxy is closed
}
func NewProxyServer() (p *ProxyServer) {
@@ -83,6 +89,7 @@ func (p *ProxyServer) Init() {
metric.SetStatus(p.Name, p.Status)
p.workConnChan = make(chan *conn.Conn, p.PoolCount+10)
p.ctlMsgChan = make(chan int64, p.PoolCount+10)
p.udpSenderChan = make(chan *msg.UdpPacket, 1024)
p.listeners = make([]Listener, 0)
p.closeChan = make(chan struct{})
p.Unlock()
@@ -150,41 +157,68 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) {
go p.connectionPoolManager(p.closeChan)
}
// start a goroutine for every listener to accept user connection
for _, listener := range p.listeners {
go func(l Listener) {
if p.Type == "udp" {
// udp is special
p.udpConn, err = conn.ListenUDP(p.BindAddr, p.ListenPort)
if err != nil {
log.Warn("ProxyName [%s], listen udp port error: %v", p.Name, err)
return err
}
go func() {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
buf := pool.GetBuf(2048)
n, remoteAddr, err := p.udpConn.ReadFromUDP(buf)
if err != nil {
log.Info("ProxyName [%s], listener is closed", p.Name)
log.Info("ProxyName [%s], udp listener is closed", p.Name)
return
}
log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
if p.Status != consts.Working {
log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
c.Close()
return
localAddr, _ := net.ResolveUDPAddr("udp", p.udpConn.LocalAddr().String())
udpPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, localAddr)
select {
case p.udpSenderChan <- udpPacket:
default:
log.Warn("ProxyName [%s], udp sender channel is full", p.Name)
}
go func(userConn *conn.Conn) {
workConn, err := p.getWorkConn()
pool.PutBuf(buf)
}
}()
} else {
// start a goroutine for every listener to accept user connection
for _, listener := range p.listeners {
go func(l Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
log.Info("ProxyName [%s], listener is closed", p.Name)
return
}
log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
if p.Status != consts.Working {
log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
c.Close()
return
}
// message will be transferred to another without modifying
// l means local, r means remote
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
go func(userConn *conn.Conn) {
workConn, err := p.getWorkConn()
if err != nil {
return
}
needRecord := true
go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
}(c)
}
}(listener)
// message will be transferred to another without modifying
// l means local, r means remote
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
needRecord := true
go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
}(c)
}
}(listener)
}
}
return nil
}
@@ -200,10 +234,18 @@ func (p *ProxyServer) Close() {
}
close(p.ctlMsgChan)
close(p.workConnChan)
close(p.udpSenderChan)
close(p.closeChan)
if p.CtlConn != nil {
p.CtlConn.Close()
}
if p.WorkConnUdp != nil {
p.WorkConnUdp.Close()
}
if p.udpConn != nil {
p.udpConn.Close()
p.udpConn = nil
}
}
metric.SetStatus(p.Name, p.Status)
// if the proxy created by PrivilegeMode, delete it when closed
@@ -228,9 +270,60 @@ func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) {
case p.workConnChan <- c:
default:
log.Debug("ProxyName [%s], workConnChan is full, so close this work connection", p.Name)
c.Close()
}
}
// create a tcp connection for forwarding udp packages
func (p *ProxyServer) RegisterNewWorkConnUdp(c *conn.Conn) {
if p.WorkConnUdp != nil && !p.WorkConnUdp.IsClosed() {
p.WorkConnUdp.Close()
}
p.WorkConnUdp = c
// read
go func() {
var (
buf string
err error
)
for {
buf, err = c.ReadLine()
if err != nil {
log.Warn("ProxyName [%s], work connection for udp closed", p.Name)
return
}
udpPacket := &msg.UdpPacket{}
err = udpPacket.UnPack([]byte(buf))
if err != nil {
log.Warn("ProxyName [%s], unpack udp packet error: %v", p.Name, err)
continue
}
// send to user
_, err = p.udpConn.WriteToUDP(udpPacket.Content, udpPacket.Dst)
if err != nil {
continue
}
}
}()
// write
go func() {
for {
udpPacket, ok := <-p.udpSenderChan
if !ok {
return
}
err := c.WriteString(string(udpPacket.Pack()) + "\n")
if err != nil {
log.Debug("ProxyName [%s], write to work connection for udp error: %v", p.Name, err)
return
}
}
}()
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.