Skip to content

Commit 126e70b

Browse files
authored
feat: grpc: allow configurable keepalive and initial windows size (#3654)
added configuration options: - idle_timeout - health_check_timeout - permit_without_stream - initial_windows_size acknowledge: - https://github.com/XTLS/Xray-core - https://xtls.github.io/en/config/transports/grpc.html Signed-off-by: Leo <i@hardrain980.com> Thanks Leo
1 parent 42a681a commit 126e70b

File tree

4 files changed

+123
-46
lines changed

4 files changed

+123
-46
lines changed

infra/conf/v4/gun.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,28 @@ import (
77
)
88

99
type GunConfig struct {
10-
ServiceName string `json:"serviceName"`
10+
ServiceName string `json:"serviceName"`
11+
IdleTimeout int32 `json:"idle_timeout"`
12+
HealthCheckTimeout int32 `json:"health_check_timeout"`
13+
PermitWithoutStream bool `json:"permit_without_stream"`
14+
InitialWindowsSize int32 `json:"initial_windows_size"`
1115
}
1216

1317
func (g GunConfig) Build() (proto.Message, error) {
14-
return &grpc.Config{ServiceName: g.ServiceName}, nil
18+
if g.IdleTimeout <= 0 {
19+
g.IdleTimeout = 0
20+
}
21+
if g.HealthCheckTimeout <= 0 {
22+
g.HealthCheckTimeout = 0
23+
}
24+
if g.InitialWindowsSize < 0 {
25+
g.InitialWindowsSize = 0
26+
}
27+
return &grpc.Config{
28+
ServiceName: g.ServiceName,
29+
IdleTimeout: g.IdleTimeout,
30+
HealthCheckTimeout: g.HealthCheckTimeout,
31+
PermitWithoutStream: g.PermitWithoutStream,
32+
InitialWindowsSize: g.InitialWindowsSize,
33+
}, nil
1534
}

transport/internet/grpc/config.pb.go

Lines changed: 49 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

transport/internet/grpc/config.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ message Config {
1616

1717
string host = 1;
1818
string service_name = 2;
19+
int32 idle_timeout = 3;
20+
int32 health_check_timeout = 4;
21+
bool permit_without_stream = 5;
22+
int32 initial_windows_size = 6;
1923
}

transport/internet/grpc/dial.go

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc/connectivity"
1515
"google.golang.org/grpc/credentials"
1616
"google.golang.org/grpc/credentials/insecure"
17+
"google.golang.org/grpc/keepalive"
1718

1819
core "github.com/v2fly/v2ray-core/v5"
1920
"github.com/v2fly/v2ray-core/v5/common"
@@ -63,15 +64,7 @@ type dialerCanceller func()
6364
func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
6465
grpcSettings := streamSettings.ProtocolSettings.(*Config)
6566

66-
config := tls.ConfigFromStreamSettings(streamSettings)
67-
68-
transportCredentials := insecure.NewCredentials()
69-
if config != nil {
70-
transportCredentials = credentials.NewTLS(config.GetTLSConfig(tls.WithDestination(dest)))
71-
}
72-
dialOption := grpc.WithTransportCredentials(transportCredentials)
73-
74-
conn, canceller, err := getGrpcClient(ctx, dest, dialOption, streamSettings)
67+
conn, canceller, err := getGrpcClient(ctx, dest, streamSettings)
7568
if err != nil {
7669
return nil, newError("Cannot dial grpc").Base(err)
7770
}
@@ -84,9 +77,54 @@ func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *interne
8477
return encoding.NewGunConn(gunService, nil), nil
8578
}
8679

87-
func getGrpcClient(ctx context.Context, dest net.Destination, dialOption grpc.DialOption, streamSettings *internet.MemoryStreamConfig) (*grpc.ClientConn, dialerCanceller, error) {
80+
func getGrpcClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (*grpc.ClientConn, dialerCanceller, error) {
8881
transportEnvironment := envctx.EnvironmentFromContext(ctx).(environment.TransportEnvironment)
8982
state, err := transportEnvironment.TransientStorage().Get(ctx, "grpc-transport-connection-state")
83+
grpcSettings := streamSettings.ProtocolSettings.(*Config)
84+
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
85+
transportCredentials := insecure.NewCredentials()
86+
if tlsConfig != nil {
87+
transportCredentials = credentials.NewTLS(tlsConfig.GetTLSConfig(tls.WithDestination(dest)))
88+
}
89+
dialOptions := []grpc.DialOption{
90+
grpc.WithTransportCredentials(transportCredentials),
91+
grpc.WithConnectParams(grpc.ConnectParams{
92+
Backoff: backoff.Config{
93+
BaseDelay: 500 * time.Millisecond,
94+
Multiplier: 1.5,
95+
Jitter: 0.2,
96+
MaxDelay: 19 * time.Second,
97+
},
98+
MinConnectTimeout: 5 * time.Second,
99+
}),
100+
grpc.WithContextDialer(func(ctxGrpc context.Context, s string) (gonet.Conn, error) {
101+
rawHost, rawPort, err := net.SplitHostPort(s)
102+
if err != nil {
103+
return nil, err
104+
}
105+
if len(rawPort) == 0 {
106+
rawPort = "443"
107+
}
108+
port, err := net.PortFromString(rawPort)
109+
if err != nil {
110+
return nil, err
111+
}
112+
address := net.ParseAddress(rawHost)
113+
detachedContext := core.ToBackgroundDetachedContext(ctx)
114+
return internet.DialSystem(detachedContext, net.TCPDestination(address, port), streamSettings.SocketSettings)
115+
}),
116+
grpc.WithDisableServiceConfig(),
117+
}
118+
if grpcSettings.IdleTimeout > 0 || grpcSettings.HealthCheckTimeout > 0 || grpcSettings.PermitWithoutStream {
119+
dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepalive.ClientParameters{
120+
Time: time.Second * time.Duration(grpcSettings.IdleTimeout),
121+
Timeout: time.Second * time.Duration(grpcSettings.HealthCheckTimeout),
122+
PermitWithoutStream: grpcSettings.PermitWithoutStream,
123+
}))
124+
}
125+
if grpcSettings.InitialWindowsSize > 0 {
126+
dialOptions = append(dialOptions, grpc.WithInitialWindowSize(grpcSettings.InitialWindowsSize))
127+
}
90128
if err != nil {
91129
state = &transportConnectionState{}
92130
transportEnvironment.TransientStorage().Put(ctx, "grpc-transport-connection-state", state)
@@ -116,33 +154,7 @@ func getGrpcClient(ctx context.Context, dest net.Destination, dialOption grpc.Di
116154

117155
conn, err := grpc.NewClient(
118156
dest.Address.String()+":"+dest.Port.String(),
119-
dialOption,
120-
grpc.WithConnectParams(grpc.ConnectParams{
121-
Backoff: backoff.Config{
122-
BaseDelay: 500 * time.Millisecond,
123-
Multiplier: 1.5,
124-
Jitter: 0.2,
125-
MaxDelay: 19 * time.Second,
126-
},
127-
MinConnectTimeout: 5 * time.Second,
128-
}),
129-
grpc.WithContextDialer(func(ctxGrpc context.Context, s string) (gonet.Conn, error) {
130-
rawHost, rawPort, err := net.SplitHostPort(s)
131-
if err != nil {
132-
return nil, err
133-
}
134-
if len(rawPort) == 0 {
135-
rawPort = "443"
136-
}
137-
port, err := net.PortFromString(rawPort)
138-
if err != nil {
139-
return nil, err
140-
}
141-
address := net.ParseAddress(rawHost)
142-
detachedContext := core.ToBackgroundDetachedContext(ctx)
143-
return internet.DialSystem(detachedContext, net.TCPDestination(address, port), streamSettings.SocketSettings)
144-
}),
145-
grpc.WithDisableServiceConfig(),
157+
dialOptions...,
146158
)
147159
canceller = func() {
148160
stateTyped.scopedDialerAccess.Lock()

0 commit comments

Comments
 (0)