Skip to content

Commit 9198be7

Browse files
authored
Merge pull request #234 from CortexFoundation/dev
torrent/monitor/client | channel close order fix
2 parents 2599cd4 + 6d35b1d commit 9198be7

File tree

2 files changed

+16
-12
lines changed

2 files changed

+16
-12
lines changed

torrentfs/monitor.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type Monitor struct {
8282
//portLock sync.Mutex
8383
//portsWg sync.WaitGroup
8484

85-
taskCh chan *Block
85+
//taskCh chan *Block
8686
newTaskHook func(*Block)
8787
blockCache *lru.Cache
8888
healthPeers *lru.Cache
@@ -121,7 +121,7 @@ func NewMonitor(flag *Config) (m *Monitor, e error) {
121121
terminated: 0,
122122
lastNumber: uint64(0),
123123
dirty: false,
124-
taskCh: make(chan *Block, batch*2),
124+
//taskCh: make(chan *Block, batch*2),
125125
}
126126
m.blockCache, _ = lru.New(6)
127127
m.healthPeers, _ = lru.New(50)
@@ -175,7 +175,7 @@ func NewMonitor(flag *Config) (m *Monitor, e error) {
175175
return m, e
176176
}
177177

178-
func (m *Monitor) taskLoop() {
178+
/*func (m *Monitor) taskLoop() {
179179
defer m.wg.Done()
180180
for {
181181
select {
@@ -186,13 +186,14 @@ func (m *Monitor) taskLoop() {
186186
187187
if err := m.deal(task); err != nil {
188188
log.Warn("Block dealing failed", "err", err)
189+
continue
189190
}
190191
case <-m.exitCh:
191192
log.Info("Monitor task channel closed")
192193
return
193194
}
194195
}
195-
}
196+
}*/
196197

197198
// SetConnection method builds connection to remote or local communicator.
198199
func SetConnection(clientURI string) (*rpc.Client, error) {
@@ -531,7 +532,8 @@ func (m *Monitor) parseBlockTorrentInfo(b *Block, flowCtrl bool) error {
531532
func (m *Monitor) Stop() {
532533
log.Info("Torrent listener closing")
533534
atomic.StoreInt32(&(m.terminated), 1)
534-
close(m.exitCh)
535+
//close(m.exitCh)
536+
//m.wg.Wait()
535537
m.wg.Wait()
536538
/*m.wg.Add(1)
537539
m.closeOnce.Do(func() {
@@ -630,8 +632,8 @@ func (m *Monitor) startWork() error {
630632
//}
631633

632634
//log.Info("Torrent fs validation passed")
633-
m.wg.Add(1)
634-
go m.taskLoop()
635+
//m.wg.Add(1)
636+
//go m.taskLoop()
635637
m.wg.Add(1)
636638
go m.listenLatestBlock()
637639
m.init()
@@ -895,6 +897,7 @@ func (m *Monitor) syncLastBlock() uint64 {
895897
if atomic.LoadInt32(&(m.terminated)) == 1 {
896898
log.Warn("Torrent scan terminated", "number", i)
897899
maxNumber = i - 1
900+
close(m.exitCh)
898901
break
899902
}
900903

@@ -903,7 +906,8 @@ func (m *Monitor) syncLastBlock() uint64 {
903906
log.Error("Sync old block failed", "number", i, "error", rpcErr)
904907
return 0
905908
}
906-
m.taskCh <- rpcBlock
909+
//m.taskCh <- rpcBlock
910+
m.deal(rpcBlock)
907911
}
908912
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
909913
m.lastNumber = maxNumber

torrentfs/torrentClient.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,8 +355,8 @@ func (tm *TorrentManager) Close() error {
355355
}
356356

357357
func (tm *TorrentManager) dropAll() {
358-
tm.lock.Lock()
359-
tm.lock.Unlock()
358+
//tm.lock.Lock()
359+
//tm.lock.Unlock()
360360
defer tm.client.Close()
361361
for _, t := range tm.torrents {
362362
stats := t.Stats()
@@ -371,8 +371,8 @@ func (tm *TorrentManager) dropAll() {
371371
//}
372372

373373
func (tm *TorrentManager) UpdateTorrent(input interface{}) error {
374-
//go func() {tm.updateTorrent <- input}()
375-
tm.updateTorrent <- input
374+
go func() { tm.updateTorrent <- input }()
375+
//tm.updateTorrent <- input
376376
return nil
377377
}
378378

0 commit comments

Comments
 (0)