@@ -2,7 +2,6 @@ package pulse
2
2
3
3
import (
4
4
"errors"
5
- "fmt"
6
5
"log/slog"
7
6
"net"
8
7
"sync"
@@ -15,15 +14,16 @@ import (
15
14
)
16
15
17
16
type Conn struct {
18
- fd int64
19
- wbufList []* []byte // write buffer, 为了理精细控制内存使用量
20
- mu sync.Mutex
21
- safeConns * safeConns [Conn ]
22
- task driver.TaskExecutor
23
- eventLoop core.PollingApi
24
- readTimer * time.Timer
25
- writeTimer * time.Timer
26
- session any // 会话数据
17
+ fd int64
18
+ wbufList []* []byte // write buffer, 为了理精细控制内存使用量
19
+ mu sync.Mutex
20
+ safeConns * safeConns [Conn ]
21
+ task driver.TaskExecutor
22
+ eventLoop core.PollingApi
23
+ readTimer * time.Timer
24
+ writeTimer * time.Timer
25
+ session any // 会话数据
26
+ readBufferSize int // 读缓冲区大小
27
27
}
28
28
29
29
func (c * Conn ) SetNoDelay (nodelay bool ) error {
@@ -33,7 +33,9 @@ func (c *Conn) SetNoDelay(nodelay bool) error {
33
33
func (c * Conn ) getFd () int {
34
34
return int (atomic .LoadInt64 (& c .fd ))
35
35
}
36
- func newConn (fd int , safeConns * safeConns [Conn ], task selectTasks , taskType TaskType , eventLoop core.PollingApi ) * Conn {
36
+ func newConn (fd int , safeConns * safeConns [Conn ],
37
+ task selectTasks , taskType TaskType ,
38
+ eventLoop core.PollingApi , readBufferSize int ) * Conn {
37
39
var taskExecutor driver.TaskExecutor
38
40
switch taskType {
39
41
case TaskTypeInConnectionGoroutine :
@@ -47,10 +49,11 @@ func newConn(fd int, safeConns *safeConns[Conn], task selectTasks, taskType Task
47
49
}
48
50
49
51
return & Conn {
50
- fd : int64 (fd ),
51
- safeConns : safeConns ,
52
- task : taskExecutor ,
53
- eventLoop : eventLoop ,
52
+ fd : int64 (fd ),
53
+ safeConns : safeConns ,
54
+ task : taskExecutor ,
55
+ eventLoop : eventLoop ,
56
+ readBufferSize : readBufferSize ,
54
57
}
55
58
}
56
59
@@ -78,6 +81,7 @@ func (c *Conn) close() {
78
81
oldFd := atomic .SwapInt64 (& c .fd , - 1 )
79
82
if oldFd != - 1 {
80
83
c .safeConns .Del (int (oldFd ))
84
+
81
85
if err := core .Close (int (oldFd )); err != nil {
82
86
// Log the error but don't panic as this is a cleanup function
83
87
slog .Error ("failed to close fd" , "fd" , oldFd , "error" , err )
@@ -108,6 +112,53 @@ func (c *Conn) writeToSocket(data []byte) (int, error) {
108
112
109
113
}
110
114
115
+ // appendToWbufList 将数据添加到写缓冲区列表
116
+ // 先检查最后一个缓冲区是否有足够空间,如果有就直接append
117
+ // 如果没有,将部分数据append到最后一个缓冲区,剩余部分创建新的readBufferSize大小的缓冲区
118
+ func (c * Conn ) appendToWbufList (data []byte ) {
119
+ if len (data ) == 0 {
120
+ return
121
+ }
122
+
123
+ // 如果wbufList为空,直接创建新的缓冲区
124
+ if len (c .wbufList ) == 0 {
125
+ newBuf := getBytesWithSize (len (data ), c .readBufferSize )
126
+ copy (* newBuf , data )
127
+ * newBuf = (* newBuf )[:len (data )]
128
+ c .wbufList = append (c .wbufList , newBuf )
129
+ return
130
+ }
131
+
132
+ // 获取最后一个缓冲区
133
+ lastBuf := c .wbufList [len (c .wbufList )- 1 ]
134
+ remainingSpace := cap (* lastBuf ) - len (* lastBuf )
135
+
136
+ // 如果最后一个缓冲区有足够空间,直接append
137
+ if remainingSpace >= len (data ) {
138
+ * lastBuf = append (* lastBuf , data ... )
139
+ return
140
+ }
141
+
142
+ // 如果空间不够,先填满最后一个缓冲区
143
+ if remainingSpace > 0 {
144
+ * lastBuf = append (* lastBuf , data [:remainingSpace ]... )
145
+ data = data [remainingSpace :] // 剩余的数据
146
+ }
147
+
148
+ // 为剩余数据创建新的缓冲区(使用readBufferSize大小)
149
+ for len (data ) > 0 {
150
+ newBuf := getBytesWithSize (len (data ), c .readBufferSize )
151
+ copySize := len (data )
152
+ if copySize > cap (* newBuf ) {
153
+ copySize = cap (* newBuf )
154
+ }
155
+ copy (* newBuf , data [:copySize ])
156
+ * newBuf = (* newBuf )[:copySize ]
157
+ c .wbufList = append (c .wbufList , newBuf )
158
+ data = data [copySize :]
159
+ }
160
+ }
161
+
111
162
// handlePartialWrite 处理部分写入的情况,创建新缓冲区存储剩余数据
112
163
func (c * Conn ) handlePartialWrite (data []byte , n int , needAppend bool ) error {
113
164
if n < 0 {
@@ -119,22 +170,12 @@ func (c *Conn) handlePartialWrite(data []byte, n int, needAppend bool) error {
119
170
return nil
120
171
}
121
172
122
- remainingSize := len (data ) - n
123
- newBuf := getBytes (remainingSize )
124
- if cap (* newBuf ) < remainingSize {
125
- putBytes (newBuf )
126
- return fmt .Errorf ("getBytes allocated insufficient capacity: %d < %d" , cap (* newBuf ), remainingSize )
127
- }
128
-
129
- copy (* newBuf , data [n :])
130
- * newBuf = (* newBuf )[:remainingSize ]
131
-
173
+ remainingData := data [n :]
132
174
if needAppend {
133
- c .wbufList = append ( c . wbufList , newBuf )
175
+ c .appendToWbufList ( remainingData )
134
176
}
177
+
135
178
if err := c .eventLoop .AddWrite (c .getFd ()); err != nil {
136
- // 如果事件注册失败,释放缓冲区
137
- putBytes (newBuf )
138
179
slog .Error ("failed to add write event" , "error" , err )
139
180
return err
140
181
}
@@ -171,13 +212,7 @@ func (c *Conn) Write(data []byte) (int, error) {
171
212
}
172
213
173
214
if len (data ) > 0 {
174
- newBuf := getBytes (len (data ))
175
- if cap (* newBuf ) < len (data ) {
176
- panic ("newBuf cap is less than data" )
177
- }
178
- copy (* newBuf , data )
179
- * newBuf = (* newBuf )[:len (data )]
180
- c .wbufList = append (c .wbufList , newBuf )
215
+ c .appendToWbufList (data )
181
216
}
182
217
183
218
i := 0
@@ -209,7 +244,7 @@ func (c *Conn) Write(data []byte) (int, error) {
209
244
}
210
245
211
246
// 所有数据都已写入
212
- c .wbufList = nil
247
+ c .wbufList = c . wbufList [: 0 ]
213
248
if err := c .eventLoop .ResetRead (c .getFd ()); err != nil {
214
249
slog .Error ("failed to reset read event" , "error" , err )
215
250
}
@@ -241,7 +276,7 @@ func handleData(c *Conn, options *Options, rawData []byte) {
241
276
}
242
277
243
278
var newBytes * []byte
244
- newBytes = getBytes (len (rawData ))
279
+ newBytes = getBytesWithSize (len (rawData ), c . readBufferSize )
245
280
copy (* newBytes , rawData )
246
281
* newBytes = (* newBytes )[:len (rawData )]
247
282
0 commit comments