forked from Mxmilu666/frp
merge upstream
This commit is contained in:
@@ -32,9 +32,13 @@ func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter, err error) {
|
||||
case v1.AuthMethodToken:
|
||||
authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)
|
||||
case v1.AuthMethodOIDC:
|
||||
authProvider, err = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if cfg.OIDC.TokenSource != nil {
|
||||
authProvider = NewOidcTokenSourceAuthSetter(cfg.AdditionalScopes, cfg.OIDC.TokenSource)
|
||||
} else {
|
||||
authProvider, err = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported auth method: %s", cfg.Method)
|
||||
|
||||
@@ -152,6 +152,51 @@ func (auth *OidcAuthProvider) SetNewWorkConn(newWorkConnMsg *msg.NewWorkConn) (e
|
||||
return err
|
||||
}
|
||||
|
||||
type OidcTokenSourceAuthProvider struct {
|
||||
additionalAuthScopes []v1.AuthScope
|
||||
|
||||
valueSource *v1.ValueSource
|
||||
}
|
||||
|
||||
func NewOidcTokenSourceAuthSetter(additionalAuthScopes []v1.AuthScope, valueSource *v1.ValueSource) *OidcTokenSourceAuthProvider {
|
||||
return &OidcTokenSourceAuthProvider{
|
||||
additionalAuthScopes: additionalAuthScopes,
|
||||
valueSource: valueSource,
|
||||
}
|
||||
}
|
||||
|
||||
func (auth *OidcTokenSourceAuthProvider) generateAccessToken() (accessToken string, err error) {
|
||||
ctx := context.Background()
|
||||
accessToken, err = auth.valueSource.Resolve(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("couldn't acquire OIDC token for login: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (auth *OidcTokenSourceAuthProvider) SetLogin(loginMsg *msg.Login) (err error) {
|
||||
loginMsg.PrivilegeKey, err = auth.generateAccessToken()
|
||||
return err
|
||||
}
|
||||
|
||||
func (auth *OidcTokenSourceAuthProvider) SetPing(pingMsg *msg.Ping) (err error) {
|
||||
if !slices.Contains(auth.additionalAuthScopes, v1.AuthScopeHeartBeats) {
|
||||
return nil
|
||||
}
|
||||
|
||||
pingMsg.PrivilegeKey, err = auth.generateAccessToken()
|
||||
return err
|
||||
}
|
||||
|
||||
func (auth *OidcTokenSourceAuthProvider) SetNewWorkConn(newWorkConnMsg *msg.NewWorkConn) (err error) {
|
||||
if !slices.Contains(auth.additionalAuthScopes, v1.AuthScopeNewWorkConns) {
|
||||
return nil
|
||||
}
|
||||
|
||||
newWorkConnMsg.PrivilegeKey, err = auth.generateAccessToken()
|
||||
return err
|
||||
}
|
||||
|
||||
type TokenVerifier interface {
|
||||
Verify(context.Context, string) (*oidc.IDToken, error)
|
||||
}
|
||||
|
||||
@@ -281,6 +281,17 @@ func LoadClientConfig(path string, strict bool) (
|
||||
})
|
||||
}
|
||||
|
||||
// Filter by enabled field in each proxy
|
||||
// nil or true means enabled, false means disabled
|
||||
proxyCfgs = lo.Filter(proxyCfgs, func(c v1.ProxyConfigurer, _ int) bool {
|
||||
enabled := c.GetBaseConfig().Enabled
|
||||
return enabled == nil || *enabled
|
||||
})
|
||||
visitorCfgs = lo.Filter(visitorCfgs, func(c v1.VisitorConfigurer, _ int) bool {
|
||||
enabled := c.GetBaseConfig().Enabled
|
||||
return enabled == nil || *enabled
|
||||
})
|
||||
|
||||
if cliCfg != nil {
|
||||
if err := cliCfg.Complete(); err != nil {
|
||||
return nil, nil, nil, isLegacyFormat, err
|
||||
|
||||
@@ -239,6 +239,10 @@ type AuthOIDCClientConfig struct {
|
||||
// Supports http, https, socks5, and socks5h proxy protocols.
|
||||
// If empty, no proxy is used for OIDC connections.
|
||||
ProxyURL string `json:"proxyURL,omitempty"`
|
||||
|
||||
// TokenSource specifies a custom dynamic source for the authorization token.
|
||||
// This is mutually exclusive with every other field of this structure.
|
||||
TokenSource *ValueSource `json:"tokenSource,omitempty"`
|
||||
}
|
||||
|
||||
type VirtualNetConfig struct {
|
||||
|
||||
@@ -108,8 +108,11 @@ type DomainConfig struct {
|
||||
}
|
||||
|
||||
type ProxyBaseConfig struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
// Enabled controls whether this proxy is enabled. nil or true means enabled, false means disabled.
|
||||
// This allows individual control over each proxy, complementing the global "start" field.
|
||||
Enabled *bool `json:"enabled,omitempty"`
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
Transport ProxyTransport `json:"transport,omitempty"`
|
||||
// metadata info for each proxy
|
||||
|
||||
@@ -23,55 +23,111 @@ import (
|
||||
"github.com/samber/lo"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/featuregate"
|
||||
"github.com/fatedier/frp/pkg/policy/featuregate"
|
||||
"github.com/fatedier/frp/pkg/policy/security"
|
||||
)
|
||||
|
||||
func ValidateClientCommonConfig(c *v1.ClientCommonConfig) (Warning, error) {
|
||||
func ValidateClientCommonConfig(c *v1.ClientCommonConfig, unsafeFeatures *security.UnsafeFeatures) (Warning, error) {
|
||||
var (
|
||||
warnings Warning
|
||||
errs error
|
||||
)
|
||||
// validate feature gates
|
||||
if c.VirtualNet.Address != "" {
|
||||
if !featuregate.Enabled(featuregate.VirtualNet) {
|
||||
return warnings, fmt.Errorf("VirtualNet feature is not enabled; enable it by setting the appropriate feature gate flag")
|
||||
}
|
||||
|
||||
validators := []func() (Warning, error){
|
||||
func() (Warning, error) { return validateFeatureGates(c) },
|
||||
func() (Warning, error) { return validateAuthConfig(&c.Auth, unsafeFeatures) },
|
||||
func() (Warning, error) { return nil, validateLogConfig(&c.Log) },
|
||||
func() (Warning, error) { return nil, validateWebServerConfig(&c.WebServer) },
|
||||
func() (Warning, error) { return validateTransportConfig(&c.Transport) },
|
||||
func() (Warning, error) { return validateIncludeFiles(c.IncludeConfigFiles) },
|
||||
}
|
||||
|
||||
if !slices.Contains(SupportedAuthMethods, c.Auth.Method) {
|
||||
for _, v := range validators {
|
||||
w, err := v()
|
||||
warnings = AppendError(warnings, w)
|
||||
errs = AppendError(errs, err)
|
||||
}
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
func validateFeatureGates(c *v1.ClientCommonConfig) (Warning, error) {
|
||||
if c.VirtualNet.Address != "" {
|
||||
if !featuregate.Enabled(featuregate.VirtualNet) {
|
||||
return nil, fmt.Errorf("VirtualNet feature is not enabled; enable it by setting the appropriate feature gate flag")
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func validateAuthConfig(c *v1.AuthClientConfig, unsafeFeatures *security.UnsafeFeatures) (Warning, error) {
|
||||
var errs error
|
||||
if !slices.Contains(SupportedAuthMethods, c.Method) {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid auth method, optional values are %v", SupportedAuthMethods))
|
||||
}
|
||||
if !lo.Every(SupportedAuthAdditionalScopes, c.Auth.AdditionalScopes) {
|
||||
if !lo.Every(SupportedAuthAdditionalScopes, c.AdditionalScopes) {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid auth additional scopes, optional values are %v", SupportedAuthAdditionalScopes))
|
||||
}
|
||||
|
||||
// Validate token/tokenSource mutual exclusivity
|
||||
if c.Auth.Token != "" && c.Auth.TokenSource != nil {
|
||||
if c.Token != "" && c.TokenSource != nil {
|
||||
errs = AppendError(errs, fmt.Errorf("cannot specify both auth.token and auth.tokenSource"))
|
||||
}
|
||||
|
||||
// Validate tokenSource if specified
|
||||
if c.Auth.TokenSource != nil {
|
||||
if err := c.Auth.TokenSource.Validate(); err != nil {
|
||||
if c.TokenSource != nil {
|
||||
if c.TokenSource.Type == "exec" {
|
||||
if !unsafeFeatures.IsEnabled(security.TokenSourceExec) {
|
||||
errs = AppendError(errs, fmt.Errorf("unsafe feature %q is not enabled. "+
|
||||
"To enable it, start frpc with '--allow-unsafe %s'", security.TokenSourceExec, security.TokenSourceExec))
|
||||
}
|
||||
}
|
||||
if err := c.TokenSource.Validate(); err != nil {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid auth.tokenSource: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateLogConfig(&c.Log); err != nil {
|
||||
if err := validateOIDCConfig(&c.OIDC, unsafeFeatures); err != nil {
|
||||
errs = AppendError(errs, err)
|
||||
}
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
if err := validateWebServerConfig(&c.WebServer); err != nil {
|
||||
errs = AppendError(errs, err)
|
||||
func validateOIDCConfig(c *v1.AuthOIDCClientConfig, unsafeFeatures *security.UnsafeFeatures) error {
|
||||
if c.TokenSource == nil {
|
||||
return nil
|
||||
}
|
||||
var errs error
|
||||
// Validate oidc.tokenSource mutual exclusivity with other fields of oidc
|
||||
if c.ClientID != "" || c.ClientSecret != "" || c.Audience != "" ||
|
||||
c.Scope != "" || c.TokenEndpointURL != "" || len(c.AdditionalEndpointParams) > 0 ||
|
||||
c.TrustedCaFile != "" || c.InsecureSkipVerify || c.ProxyURL != "" {
|
||||
errs = AppendError(errs, fmt.Errorf("cannot specify both auth.oidc.tokenSource and any other field of auth.oidc"))
|
||||
}
|
||||
if c.TokenSource.Type == "exec" {
|
||||
if !unsafeFeatures.IsEnabled(security.TokenSourceExec) {
|
||||
errs = AppendError(errs, fmt.Errorf("unsafe feature %q is not enabled. "+
|
||||
"To enable it, start frpc with '--allow-unsafe %s'", security.TokenSourceExec, security.TokenSourceExec))
|
||||
}
|
||||
}
|
||||
if err := c.TokenSource.Validate(); err != nil {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid auth.oidc.tokenSource: %v", err))
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
if c.Transport.HeartbeatTimeout > 0 && c.Transport.HeartbeatInterval > 0 {
|
||||
if c.Transport.HeartbeatTimeout < c.Transport.HeartbeatInterval {
|
||||
func validateTransportConfig(c *v1.ClientTransportConfig) (Warning, error) {
|
||||
var (
|
||||
warnings Warning
|
||||
errs error
|
||||
)
|
||||
|
||||
if c.HeartbeatTimeout > 0 && c.HeartbeatInterval > 0 {
|
||||
if c.HeartbeatTimeout < c.HeartbeatInterval {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid transport.heartbeatTimeout, heartbeat timeout should not less than heartbeat interval"))
|
||||
}
|
||||
}
|
||||
|
||||
if !lo.FromPtr(c.Transport.TLS.Enable) {
|
||||
if !lo.FromPtr(c.TLS.Enable) {
|
||||
checkTLSConfig := func(name string, value string) Warning {
|
||||
if value != "" {
|
||||
return fmt.Errorf("%s is invalid when transport.tls.enable is false", name)
|
||||
@@ -79,16 +135,20 @@ func ValidateClientCommonConfig(c *v1.ClientCommonConfig) (Warning, error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.certFile", c.Transport.TLS.CertFile))
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.keyFile", c.Transport.TLS.KeyFile))
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.trustedCaFile", c.Transport.TLS.TrustedCaFile))
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.certFile", c.TLS.CertFile))
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.keyFile", c.TLS.KeyFile))
|
||||
warnings = AppendError(warnings, checkTLSConfig("transport.tls.trustedCaFile", c.TLS.TrustedCaFile))
|
||||
}
|
||||
|
||||
if !slices.Contains(SupportedTransportProtocols, c.Transport.Protocol) {
|
||||
if !slices.Contains(SupportedTransportProtocols, c.Protocol) {
|
||||
errs = AppendError(errs, fmt.Errorf("invalid transport.protocol, optional values are %v", SupportedTransportProtocols))
|
||||
}
|
||||
return warnings, errs
|
||||
}
|
||||
|
||||
for _, f := range c.IncludeConfigFiles {
|
||||
func validateIncludeFiles(files []string) (Warning, error) {
|
||||
var errs error
|
||||
for _, f := range files {
|
||||
absDir, err := filepath.Abs(filepath.Dir(f))
|
||||
if err != nil {
|
||||
errs = AppendError(errs, fmt.Errorf("include: parse directory of %s failed: %v", f, err))
|
||||
@@ -98,13 +158,18 @@ func ValidateClientCommonConfig(c *v1.ClientCommonConfig) (Warning, error) {
|
||||
errs = AppendError(errs, fmt.Errorf("include: directory of %s not exist", f))
|
||||
}
|
||||
}
|
||||
return warnings, errs
|
||||
return nil, errs
|
||||
}
|
||||
|
||||
func ValidateAllClientConfig(c *v1.ClientCommonConfig, proxyCfgs []v1.ProxyConfigurer, visitorCfgs []v1.VisitorConfigurer) (Warning, error) {
|
||||
func ValidateAllClientConfig(
|
||||
c *v1.ClientCommonConfig,
|
||||
proxyCfgs []v1.ProxyConfigurer,
|
||||
visitorCfgs []v1.VisitorConfigurer,
|
||||
unsafeFeatures *security.UnsafeFeatures,
|
||||
) (Warning, error) {
|
||||
var warnings Warning
|
||||
if c != nil {
|
||||
warning, err := ValidateClientCommonConfig(c)
|
||||
warning, err := ValidateClientCommonConfig(c, unsafeFeatures)
|
||||
warnings = AppendError(warnings, warning)
|
||||
if err != nil {
|
||||
return warnings, err
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
type ValueSource struct {
|
||||
Type string `json:"type"`
|
||||
File *FileSource `json:"file,omitempty"`
|
||||
Exec *ExecSource `json:"exec,omitempty"`
|
||||
}
|
||||
|
||||
// FileSource specifies how to load a value from a file.
|
||||
@@ -34,6 +36,18 @@ type FileSource struct {
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
// ExecSource specifies how to get a value from another program launched as subprocess.
|
||||
type ExecSource struct {
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args,omitempty"`
|
||||
Env []ExecEnvVar `json:"env,omitempty"`
|
||||
}
|
||||
|
||||
type ExecEnvVar struct {
|
||||
Name string `json:"name"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
// Validate validates the ValueSource configuration.
|
||||
func (v *ValueSource) Validate() error {
|
||||
if v == nil {
|
||||
@@ -46,8 +60,13 @@ func (v *ValueSource) Validate() error {
|
||||
return errors.New("file configuration is required when type is 'file'")
|
||||
}
|
||||
return v.File.Validate()
|
||||
case "exec":
|
||||
if v.Exec == nil {
|
||||
return errors.New("exec configuration is required when type is 'exec'")
|
||||
}
|
||||
return v.Exec.Validate()
|
||||
default:
|
||||
return fmt.Errorf("unsupported value source type: %s (only 'file' is supported)", v.Type)
|
||||
return fmt.Errorf("unsupported value source type: %s (only 'file' and 'exec' are supported)", v.Type)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +79,8 @@ func (v *ValueSource) Resolve(ctx context.Context) (string, error) {
|
||||
switch v.Type {
|
||||
case "file":
|
||||
return v.File.Resolve(ctx)
|
||||
case "exec":
|
||||
return v.Exec.Resolve(ctx)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported value source type: %s", v.Type)
|
||||
}
|
||||
@@ -91,3 +112,47 @@ func (f *FileSource) Resolve(_ context.Context) (string, error) {
|
||||
// Trim whitespace, which is important for file-based tokens
|
||||
return strings.TrimSpace(string(content)), nil
|
||||
}
|
||||
|
||||
// Validate validates the ExecSource configuration.
|
||||
func (e *ExecSource) Validate() error {
|
||||
if e == nil {
|
||||
return errors.New("execSource cannot be nil")
|
||||
}
|
||||
|
||||
if e.Command == "" {
|
||||
return errors.New("exec command cannot be empty")
|
||||
}
|
||||
|
||||
for _, env := range e.Env {
|
||||
if env.Name == "" {
|
||||
return errors.New("exec env name cannot be empty")
|
||||
}
|
||||
if strings.Contains(env.Name, "=") {
|
||||
return errors.New("exec env name cannot contain '='")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resolve reads and returns the content captured from stdout of launched subprocess.
|
||||
func (e *ExecSource) Resolve(ctx context.Context) (string, error) {
|
||||
if err := e.Validate(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, e.Command, e.Args...)
|
||||
if len(e.Env) != 0 {
|
||||
cmd.Env = os.Environ()
|
||||
for _, env := range e.Env {
|
||||
cmd.Env = append(cmd.Env, env.Name+"="+env.Value)
|
||||
}
|
||||
}
|
||||
|
||||
content, err := cmd.Output()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to execute command %v: %v", e.Command, err)
|
||||
}
|
||||
|
||||
// Trim whitespace, which is important for exec-based tokens
|
||||
return strings.TrimSpace(string(content)), nil
|
||||
}
|
||||
|
||||
@@ -32,8 +32,11 @@ type VisitorTransport struct {
|
||||
}
|
||||
|
||||
type VisitorBaseConfig struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
// Enabled controls whether this visitor is enabled. nil or true means enabled, false means disabled.
|
||||
// This allows individual control over each visitor, complementing the global "start" field.
|
||||
Enabled *bool `json:"enabled,omitempty"`
|
||||
Transport VisitorTransport `json:"transport,omitempty"`
|
||||
SecretKey string `json:"secretKey,omitempty"`
|
||||
// if the server user is not set, it defaults to the current user
|
||||
|
||||
@@ -86,10 +86,6 @@ func (d *Dispatcher) Send(m Message) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) SendChannel() chan Message {
|
||||
return d.sendCh
|
||||
}
|
||||
|
||||
func (d *Dispatcher) RegisterHandler(msg Message, handler func(Message)) {
|
||||
d.msgHandlers[reflect.TypeOf(msg)] = handler
|
||||
}
|
||||
|
||||
@@ -23,11 +23,20 @@ import (
|
||||
"github.com/fatedier/frp/pkg/vnet"
|
||||
)
|
||||
|
||||
// PluginContext provides the necessary context and callbacks for visitor plugins.
|
||||
type PluginContext struct {
|
||||
Name string
|
||||
Ctx context.Context
|
||||
// Name is the unique identifier for this visitor, used for logging and routing.
|
||||
Name string
|
||||
|
||||
// Ctx manages the plugin's lifecycle and carries the logger for structured logging.
|
||||
Ctx context.Context
|
||||
|
||||
// VnetController manages TUN device routing. May be nil if virtual networking is disabled.
|
||||
VnetController *vnet.Controller
|
||||
HandleConn func(net.Conn)
|
||||
|
||||
// SendConnToVisitor sends a connection to the visitor's internal processing queue.
|
||||
// Does not return error; failures are handled by closing the connection.
|
||||
SendConnToVisitor func(net.Conn)
|
||||
}
|
||||
|
||||
// Creators is used for create plugins to handle connections.
|
||||
|
||||
@@ -42,6 +42,8 @@ type VirtualNetPlugin struct {
|
||||
controllerConn net.Conn
|
||||
closeSignal chan struct{}
|
||||
|
||||
consecutiveErrors int // Tracks consecutive connection errors for exponential backoff
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
@@ -98,7 +100,6 @@ func (p *VirtualNetPlugin) Start() {
|
||||
|
||||
func (p *VirtualNetPlugin) run() {
|
||||
xl := xlog.FromContextSafe(p.ctx)
|
||||
reconnectDelay := 10 * time.Second
|
||||
|
||||
for {
|
||||
currentCloseSignal := make(chan struct{})
|
||||
@@ -121,7 +122,10 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.controllerConn = controllerConn
|
||||
p.mu.Unlock()
|
||||
|
||||
pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() {
|
||||
// Wrap with CloseNotifyConn which supports both close notification and error recording
|
||||
var closeErr error
|
||||
pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func(err error) {
|
||||
closeErr = err
|
||||
close(currentCloseSignal) // Signal the run loop on close.
|
||||
})
|
||||
|
||||
@@ -129,9 +133,9 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
|
||||
xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
|
||||
|
||||
// Pass the CloseNotifyConn to HandleConn.
|
||||
// HandleConn is responsible for calling Close() on pluginNotifyConn.
|
||||
p.pluginCtx.HandleConn(pluginNotifyConn)
|
||||
// Pass the CloseNotifyConn to the visitor for handling.
|
||||
// The visitor can call CloseWithError to record the failure reason.
|
||||
p.pluginCtx.SendConnToVisitor(pluginNotifyConn)
|
||||
|
||||
// Wait for context cancellation or connection close.
|
||||
select {
|
||||
@@ -140,8 +144,32 @@ func (p *VirtualNetPlugin) run() {
|
||||
p.cleanupControllerConn(xl)
|
||||
return
|
||||
case <-currentCloseSignal:
|
||||
xl.Infof("detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name)
|
||||
// HandleConn closed the plugin side. Close the controller side.
|
||||
// Determine reconnect delay based on error with exponential backoff
|
||||
var reconnectDelay time.Duration
|
||||
if closeErr != nil {
|
||||
p.consecutiveErrors++
|
||||
xl.Warnf("connection closed with error for visitor [%s] (consecutive errors: %d): %v",
|
||||
p.pluginCtx.Name, p.consecutiveErrors, closeErr)
|
||||
|
||||
// Exponential backoff: 60s, 120s, 240s, 300s (capped)
|
||||
baseDelay := 60 * time.Second
|
||||
reconnectDelay = baseDelay * time.Duration(1<<uint(p.consecutiveErrors-1))
|
||||
if reconnectDelay > 300*time.Second {
|
||||
reconnectDelay = 300 * time.Second
|
||||
}
|
||||
} else {
|
||||
// Reset consecutive errors on successful connection
|
||||
if p.consecutiveErrors > 0 {
|
||||
xl.Infof("connection closed normally for visitor [%s], resetting error counter (was %d)",
|
||||
p.pluginCtx.Name, p.consecutiveErrors)
|
||||
p.consecutiveErrors = 0
|
||||
} else {
|
||||
xl.Infof("connection closed normally for visitor [%s]", p.pluginCtx.Name)
|
||||
}
|
||||
reconnectDelay = 10 * time.Second
|
||||
}
|
||||
|
||||
// The visitor closed the plugin side. Close the controller side.
|
||||
p.cleanupControllerConn(xl)
|
||||
|
||||
xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
|
||||
@@ -184,7 +212,7 @@ func (p *VirtualNetPlugin) Close() error {
|
||||
}
|
||||
|
||||
// Explicitly close the controller side of the pipe.
|
||||
// This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end.
|
||||
// This ensures the pipe is broken even if the run loop is stuck or the visitor hasn't closed its end.
|
||||
p.cleanupControllerConn(xl)
|
||||
xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
|
||||
|
||||
|
||||
34
pkg/policy/security/unsafe.go
Normal file
34
pkg/policy/security/unsafe.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package security
|
||||
|
||||
const (
|
||||
TokenSourceExec = "TokenSourceExec"
|
||||
)
|
||||
|
||||
var (
|
||||
ClientUnsafeFeatures = []string{
|
||||
TokenSourceExec,
|
||||
}
|
||||
|
||||
ServerUnsafeFeatures = []string{
|
||||
TokenSourceExec,
|
||||
}
|
||||
)
|
||||
|
||||
type UnsafeFeatures struct {
|
||||
features map[string]bool
|
||||
}
|
||||
|
||||
func NewUnsafeFeatures(allowed []string) *UnsafeFeatures {
|
||||
features := make(map[string]bool)
|
||||
for _, f := range allowed {
|
||||
features[f] = true
|
||||
}
|
||||
return &UnsafeFeatures{features: features}
|
||||
}
|
||||
|
||||
func (u *UnsafeFeatures) IsEnabled(feature string) bool {
|
||||
if u == nil {
|
||||
return false
|
||||
}
|
||||
return u.features[feature]
|
||||
}
|
||||
@@ -35,15 +35,19 @@ type MessageTransporter interface {
|
||||
DispatchWithType(m msg.Message, msgType, laneKey string) bool
|
||||
}
|
||||
|
||||
func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter {
|
||||
type MessageSender interface {
|
||||
Send(msg.Message) error
|
||||
}
|
||||
|
||||
func NewMessageTransporter(sender MessageSender) MessageTransporter {
|
||||
return &transporterImpl{
|
||||
sendCh: sendCh,
|
||||
sender: sender,
|
||||
registry: make(map[string]map[string]chan msg.Message),
|
||||
}
|
||||
}
|
||||
|
||||
type transporterImpl struct {
|
||||
sendCh chan msg.Message
|
||||
sender MessageSender
|
||||
|
||||
// First key is message type and second key is lane key.
|
||||
// Dispatch will dispatch message to related channel by its message type
|
||||
@@ -53,9 +57,7 @@ type transporterImpl struct {
|
||||
}
|
||||
|
||||
func (impl *transporterImpl) Send(m msg.Message) error {
|
||||
return errors.PanicToError(func() {
|
||||
impl.sendCh <- m
|
||||
})
|
||||
return impl.sender.Send(m)
|
||||
}
|
||||
|
||||
func (impl *transporterImpl) Do(ctx context.Context, req msg.Message, laneKey, recvMsgType string) (msg.Message, error) {
|
||||
|
||||
@@ -135,11 +135,11 @@ type CloseNotifyConn struct {
|
||||
// 1 means closed
|
||||
closeFlag int32
|
||||
|
||||
closeFn func()
|
||||
closeFn func(error)
|
||||
}
|
||||
|
||||
// closeFn will be only called once
|
||||
func WrapCloseNotifyConn(c net.Conn, closeFn func()) net.Conn {
|
||||
// closeFn will be only called once with the error (nil if Close() was called, non-nil if CloseWithError() was called)
|
||||
func WrapCloseNotifyConn(c net.Conn, closeFn func(error)) *CloseNotifyConn {
|
||||
return &CloseNotifyConn{
|
||||
Conn: c,
|
||||
closeFn: closeFn,
|
||||
@@ -151,12 +151,25 @@ func (cc *CloseNotifyConn) Close() (err error) {
|
||||
if pflag == 0 {
|
||||
err = cc.Conn.Close()
|
||||
if cc.closeFn != nil {
|
||||
cc.closeFn()
|
||||
cc.closeFn(nil)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// CloseWithError closes the connection and passes the error to the close callback.
|
||||
func (cc *CloseNotifyConn) CloseWithError(err error) error {
|
||||
pflag := atomic.SwapInt32(&cc.closeFlag, 1)
|
||||
if pflag == 0 {
|
||||
closeErr := cc.Conn.Close()
|
||||
if cc.closeFn != nil {
|
||||
cc.closeFn(err)
|
||||
}
|
||||
return closeErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type StatsConn struct {
|
||||
net.Conn
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
|
||||
muxer := http.NewServeMux()
|
||||
muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) {
|
||||
notifyCh := make(chan struct{})
|
||||
conn := WrapCloseNotifyConn(c, func() {
|
||||
conn := WrapCloseNotifyConn(c, func(_ error) {
|
||||
close(notifyCh)
|
||||
})
|
||||
wl.acceptCh <- conn
|
||||
|
||||
Reference in New Issue
Block a user