@@ -141,10 +141,48 @@ func (s *canalTestSuite) TestCanal(c *C) {
141
141
s .execute (c , "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`" )
142
142
s .execute (c , "INSERT INTO test.canal_test (name,age) VALUES (?,?)" , "d" , "18" )
143
143
144
- err := s .c . CatchMasterPos ( 10 * time .Second )
144
+ err := CatchMasterPos ( s .c , 10 * time .Second )
145
145
c .Assert (err , IsNil )
146
146
}
147
147
148
+ func CatchMasterPos (c * Canal , timeout time.Duration ) error {
149
+ pos , err := c .GetMasterPos ()
150
+ if err != nil {
151
+ return errors .Trace (err )
152
+ }
153
+
154
+ return WaitUntilPos (c , pos , timeout )
155
+ }
156
+
157
+ func FlushBinlog (c * Canal ) error {
158
+ _ , err := c .Execute ("FLUSH BINARY LOGS" )
159
+ return errors .Trace (err )
160
+ }
161
+
162
+ func WaitUntilPos (c * Canal , pos mysql.Position , timeout time.Duration ) error {
163
+ timer := time .NewTimer (timeout )
164
+ for {
165
+ select {
166
+ case <- timer .C :
167
+ return errors .Errorf ("wait position %v too long > %s" , pos , timeout )
168
+ default :
169
+ err := FlushBinlog (c )
170
+ if err != nil {
171
+ return errors .Trace (err )
172
+ }
173
+ curPos := c .master .Position ()
174
+ if curPos .Compare (pos ) >= 0 {
175
+ return nil
176
+ } else {
177
+ log .Debugf ("master pos is %v, wait catching %v" , curPos , pos )
178
+ time .Sleep (100 * time .Millisecond )
179
+ }
180
+ }
181
+ }
182
+
183
+ return nil
184
+ }
185
+
148
186
func (s * canalTestSuite ) TestCanalFilter (c * C ) {
149
187
// included
150
188
sch , err := s .c .GetTable ("test" , "canal_test" )
0 commit comments