@@ -2,82 +2,113 @@ package pulse
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"net"
6
- "runtime"
7
7
"sync"
8
8
"sync/atomic"
9
9
10
10
"github.com/antlabs/pulse/core"
11
11
)
12
12
13
13
type ClientEventLoop struct {
14
- pollers []core.PollingApi
15
- conns []* core.SafeConns [Conn ]
16
- callback Callback
17
- options Options
18
- next uint32 // 用于轮询分配
19
- ctx context.Context
14
+ * MultiEventLoop
15
+ next uint32 // 轮询计数器
16
+ conns * core.SafeConns [Conn ] // 每个事件循环的连接管理器
17
+ callback Callback // 回调函数
18
+ ctx context.Context // 上下文
20
19
}
21
20
22
21
func NewClientEventLoop (ctx context.Context , opts ... func (* Options )) * ClientEventLoop {
23
- var options Options
24
- for _ , opt := range opts {
25
- opt (& options )
26
- }
27
- n := runtime .NumCPU ()
28
- pollers := make ([]core.PollingApi , n )
29
- conns := make ([]* core.SafeConns [Conn ], n )
30
- for i := 0 ; i < n ; i ++ {
31
- pollers [i ], _ = core .Create (core .TriggerTypeEdge )
32
- conns [i ] = & core.SafeConns [Conn ]{}
33
- conns [i ].Init (core .GetMaxFd ())
22
+ multiLoop , err := NewMultiEventLoop (ctx , opts ... )
23
+ if err != nil {
24
+ panic (err )
34
25
}
26
+
27
+ // 初始化连接管理器
28
+ conns := core.SafeConns [Conn ]{}
29
+ conns .Init (core .GetMaxFd ())
30
+
35
31
return & ClientEventLoop {
36
- pollers : pollers ,
37
- conns : conns ,
38
- callback : options .callback ,
39
- options : options ,
40
- ctx : ctx ,
32
+ MultiEventLoop : multiLoop ,
33
+ conns : & conns ,
34
+ callback : multiLoop .options .callback ,
35
+ ctx : ctx ,
41
36
}
42
37
}
43
38
44
39
func (loop * ClientEventLoop ) RegisterConn (conn net.Conn ) error {
40
+ // 1. 获取文件描述符
45
41
fd , err := core .GetFdFromConn (conn )
46
42
if err != nil {
47
- return err
43
+ return fmt . Errorf ( "failed to get fd from connection: %w" , err )
48
44
}
45
+
46
+ // 2. 关闭原始连接(因为我们要使用文件描述符)
49
47
if err := conn .Close (); err != nil {
50
- return err
48
+ return fmt . Errorf ( "failed to close original connection: %w" , err )
51
49
}
52
- idx := atomic .AddUint32 (& loop .next , 1 ) % uint32 (len (loop .pollers ))
53
- c := newConn (fd , loop .conns [idx ], nil , TaskTypeInEventLoop , loop .pollers [idx ], 4096 , false )
54
- loop .conns [idx ].Add (fd , c )
55
- loop .callback .OnOpen (c )
56
- return loop .pollers [idx ].AddRead (fd )
50
+
51
+ // 3. 选择事件循环(轮询分配)
52
+ eventLoopIndex := loop .selectEventLoop ()
53
+ eventLoop := loop .MultiEventLoop .eventLoops [eventLoopIndex ]
54
+
55
+ // 4. 创建新连接
56
+ connInstance := loop .createConn (fd , loop .conns , eventLoop )
57
+
58
+ // 5. 添加到连接管理器
59
+ loop .conns .Add (fd , connInstance )
60
+
61
+ // 6. 调用回调函数
62
+ if loop .callback != nil {
63
+ loop .callback .OnOpen (connInstance )
64
+ }
65
+
66
+ // 7. 添加到事件循环
67
+ return eventLoop .AddRead (fd )
68
+ }
69
+
70
+ // selectEventLoop 选择事件循环(轮询分配)
71
+ func (loop * ClientEventLoop ) selectEventLoop () int {
72
+ return int (atomic .AddUint32 (& loop .next , 1 ) % uint32 (len (loop .MultiEventLoop .eventLoops )))
73
+ }
74
+
75
+ // createConn 创建连接实例
76
+ func (loop * ClientEventLoop ) createConn (fd int , safeConns * core.SafeConns [Conn ], eventLoop core.PollingApi ) * Conn {
77
+ return newConn (
78
+ fd ,
79
+ safeConns ,
80
+ nil , // 客户端模式不需要任务池
81
+ TaskTypeInEventLoop ,
82
+ eventLoop ,
83
+ loop .MultiEventLoop .options .eventLoopReadBufferSize ,
84
+ loop .MultiEventLoop .options .flowBackPressureRemoveRead ,
85
+ )
57
86
}
58
87
59
88
func (loop * ClientEventLoop ) Serve () {
60
- n := len (loop .pollers )
89
+ n := len (loop .MultiEventLoop . eventLoops )
61
90
var wg sync.WaitGroup
62
91
wg .Add (n )
63
92
defer wg .Wait ()
64
93
65
94
for i := 0 ; i < n ; i ++ {
66
95
go func (idx int ) {
67
96
defer wg .Done ()
68
- buf := make ([]byte , loop .options .eventLoopReadBufferSize )
97
+ buf := make ([]byte , loop .MultiEventLoop . options .eventLoopReadBufferSize )
69
98
for {
70
99
select {
71
100
case <- loop .ctx .Done ():
72
101
return
73
102
default :
74
103
}
75
- _ , err := loop .pollers [idx ].Poll (0 , func (fd int , state core.State , pollErr error ) {
76
- c := loop .conns [ idx ] .GetUnsafe (fd )
104
+ _ , err := loop .MultiEventLoop . eventLoops [idx ].Poll (0 , func (fd int , state core.State , pollErr error ) {
105
+ c := loop .conns .GetUnsafe (fd )
77
106
if pollErr != nil {
78
107
if c != nil {
79
108
c .Close ()
80
- loop .callback .OnClose (c , pollErr )
109
+ if loop .callback != nil {
110
+ loop .callback .OnClose (c , pollErr )
111
+ }
81
112
}
82
113
return
83
114
}
@@ -90,15 +121,21 @@ func (loop *ClientEventLoop) Serve() {
90
121
c .mu .Unlock ()
91
122
if err != nil {
92
123
c .Close ()
93
- loop .callback .OnClose (c , err )
124
+ if loop .callback != nil {
125
+ loop .callback .OnClose (c , err )
126
+ }
94
127
return
95
128
}
96
129
if n == 0 {
97
130
c .Close ()
98
- loop .callback .OnClose (c , nil )
131
+ if loop .callback != nil {
132
+ loop .callback .OnClose (c , nil )
133
+ }
99
134
return
100
135
}
101
- loop .callback .OnData (c , buf [:n ])
136
+ if loop .callback != nil {
137
+ loop .callback .OnData (c , buf [:n ])
138
+ }
102
139
}
103
140
})
104
141
if err != nil {
0 commit comments