all: add /api/proxies api for statistics info

This commit is contained in:
fatedier
2016-07-17 21:42:21 +08:00
parent a9bf25f255
commit ac09ba3982
8 changed files with 289 additions and 7 deletions

View File

@@ -24,6 +24,7 @@ import (
"sync"
"frp/models/config"
"frp/models/metric"
"frp/utils/conn"
"frp/utils/log"
"frp/utils/pcrypto"
@@ -52,7 +53,7 @@ func Join(c1 *conn.Conn, c2 *conn.Conn) {
}
// join two connections and do some operations
func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) {
func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf, needRecord bool) {
var wait sync.WaitGroup
encryptPipe := func(from *conn.Conn, to *conn.Conn) {
defer from.Close()
@@ -60,7 +61,7 @@ func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) {
defer wait.Done()
// we don't care about errors here
pipeEncrypt(from.TcpConn, to.TcpConn, conf)
pipeEncrypt(from.TcpConn, to.TcpConn, conf, needRecord)
}
decryptPipe := func(to *conn.Conn, from *conn.Conn) {
@@ -69,13 +70,16 @@ func JoinMore(c1 *conn.Conn, c2 *conn.Conn, conf config.BaseConf) {
defer wait.Done()
// we don't care about errors here
pipeDecrypt(to.TcpConn, from.TcpConn, conf)
pipeDecrypt(to.TcpConn, from.TcpConn, conf, needRecord)
}
wait.Add(2)
go encryptPipe(c1, c2)
go decryptPipe(c2, c1)
wait.Wait()
if needRecord {
metric.CloseConnection(conf.Name)
}
log.Debug("ProxyName [%s], One tunnel stopped", conf.Name)
return
}
@@ -102,7 +106,7 @@ func unpkgMsg(data []byte) (int, []byte, []byte) {
}
// decrypt msg from reader, then write into writer
func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf, needRecord bool) (err error) {
laes := new(pcrypto.Pcrypto)
key := conf.AuthToken
if conf.PrivilegeMode {
@@ -116,6 +120,15 @@ func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
buf := make([]byte, 5*1024+4)
var left, res []byte
var cnt int
// record
var flowBytes int64 = 0
if needRecord {
defer func() {
metric.AddFlowOut(conf.Name, flowBytes)
}()
}
nreader := bufio.NewReader(r)
for {
// there may be more than 1 package in variable
@@ -156,12 +169,20 @@ func pipeDecrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
if err != nil {
return err
}
if needRecord {
flowBytes += int64(len(res))
if flowBytes >= 1024*1024 {
metric.AddFlowOut(conf.Name, flowBytes)
flowBytes = 0
}
}
}
return nil
}
// recvive msg from reader, then encrypt msg into writer
func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf, needRecord bool) (err error) {
laes := new(pcrypto.Pcrypto)
key := conf.AuthToken
if conf.PrivilegeMode {
@@ -172,6 +193,14 @@ func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
return fmt.Errorf("Pcrypto Init error: %v", err)
}
// record
var flowBytes int64 = 0
if needRecord {
defer func() {
metric.AddFlowIn(conf.Name, flowBytes)
}()
}
nreader := bufio.NewReader(r)
buf := make([]byte, 5*1024)
for {
@@ -179,6 +208,13 @@ func pipeEncrypt(r net.Conn, w net.Conn, conf config.BaseConf) (err error) {
if err != nil {
return err
}
if needRecord {
flowBytes += int64(n)
if flowBytes >= 1024*1024 {
metric.AddFlowIn(conf.Name, flowBytes)
flowBytes = 0
}
}
res := buf[0:n]
// gzip