@@ -54,6 +54,8 @@ const (
5454 // LogUnexpectedResultId is logged when response with unknown id was received.
5555 // Most probably it is due to request timeout.
5656 LogUnexpectedResultId
57+ // LogReadWatchEventFailed is logged when failed to read a watch event.
58+ LogReadWatchEventFailed
5759)
5860
5961// ConnEvent is sent throw Notify channel specified in Opts.
@@ -63,6 +65,12 @@ type ConnEvent struct {
6365 When time.Time
6466}
6567
68+ // A raw watch event.
69+ type connWatchEvent struct {
70+ key string
71+ value interface {}
72+ }
73+
6674var epoch = time .Now ()
6775
6876// Logger is logger type expected to be passed in options.
@@ -84,6 +92,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
8492 case LogUnexpectedResultId :
8593 resp := v [0 ].(* Response )
8694 log .Printf ("tarantool: connection %s got unexpected resultId (%d) in response" , conn .addr , resp .RequestId )
95+ case LogReadWatchEventFailed :
96+ err := v [0 ].(error )
97+ log .Printf ("tarantool: unable to parse watch event: %s" , err )
8798 default :
8899 args := append ([]interface {}{"tarantool: unexpected event " , event , conn }, v ... )
89100 log .Print (args ... )
@@ -149,6 +160,8 @@ type Connection struct {
149160 lastStreamId uint64
150161
151162 serverProtocolInfo ProtocolInfo
163+ // watchMap is a map of key -> watchSharedData.
164+ watchMap sync.Map
152165}
153166
154167var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -531,7 +544,7 @@ func (conn *Connection) dial() (err error) {
531544 return fmt .Errorf ("identify: %w" , err )
532545 }
533546
534- // Auth
547+ // Auth.
535548 if opts .User != "" {
536549 scr , err := scramble (conn .Greeting .auth , opts .Pass )
537550 if err != nil {
@@ -549,7 +562,38 @@ func (conn *Connection) dial() (err error) {
549562 }
550563 }
551564
552- // Only if connected and authenticated.
565+ // Watchers.
566+ watchersChecked := false
567+ conn .watchMap .Range (func (key , value interface {}) bool {
568+ if ! watchersChecked {
569+ required := ProtocolInfo {Features : []ProtocolFeature {WatchersFeature }}
570+ err = checkProtocolInfo (required , conn .ServerProtocolInfo ())
571+ if err != nil {
572+ return false
573+ }
574+ watchersChecked = true
575+ }
576+
577+ st := value .(chan watchState )
578+ state := <- st
579+ if state .cnt > 0 {
580+ req := newWatchRequest (key .(string ))
581+ if err = conn .writeRequest (w , req ); err != nil {
582+ st <- state
583+ return false
584+ }
585+ state .init = true
586+ state .ack = true
587+ }
588+ st <- state
589+ return true
590+ })
591+
592+ if err != nil {
593+ return fmt .Errorf ("unable to register watch: %w" , err )
594+ }
595+
596+ // Only if connected and fully initialized.
553597 conn .lockShards ()
554598 conn .c = connection
555599 atomic .StoreUint32 (& conn .state , connConnected )
@@ -843,7 +887,52 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
843887 }
844888}
845889
890+ func readWatchEvent (reader io.Reader ) (connWatchEvent , error ) {
891+ keyExist := false
892+ event := connWatchEvent {}
893+ d := newDecoder (reader )
894+
895+ l , err := d .DecodeMapLen ()
896+ if err != nil {
897+ return event , err
898+ }
899+
900+ for ; l > 0 ; l -- {
901+ cd , err := d .DecodeInt ()
902+ if err != nil {
903+ return event , err
904+ }
905+
906+ switch cd {
907+ case KeyEvent :
908+ if event .key , err = d .DecodeString (); err != nil {
909+ return event , err
910+ }
911+ keyExist = true
912+ case KeyEventData :
913+ if event .value , err = d .DecodeInterface (); err != nil {
914+ return event , err
915+ }
916+ default :
917+ if err = d .Skip (); err != nil {
918+ return event , err
919+ }
920+ }
921+ }
922+
923+ if ! keyExist {
924+ return event , errors .New ("watch event does not have a key" )
925+ }
926+
927+ return event , nil
928+ }
929+
846930func (conn * Connection ) reader (r * bufio.Reader , c net.Conn ) {
931+ events := make (chan connWatchEvent , 1024 )
932+ defer close (events )
933+
934+ go conn .eventer (events )
935+
847936 for atomic .LoadUint32 (& conn .state ) != connClosed {
848937 respBytes , err := conn .read (r )
849938 if err != nil {
@@ -858,7 +947,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
858947 }
859948
860949 var fut * Future = nil
861- if resp .Code == PushCode {
950+ if resp .Code == EventCode {
951+ if event , err := readWatchEvent (& resp .buf ); err == nil {
952+ events <- event
953+ } else {
954+ conn .opts .Logger .Report (LogReadWatchEventFailed , conn , err )
955+ }
956+ continue
957+ } else if resp .Code == PushCode {
862958 if fut = conn .peekFuture (resp .RequestId ); fut != nil {
863959 fut .AppendPush (resp )
864960 }
@@ -868,12 +964,37 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
868964 conn .markDone (fut )
869965 }
870966 }
967+
871968 if fut == nil {
872969 conn .opts .Logger .Report (LogUnexpectedResultId , conn , resp )
873970 }
874971 }
875972}
876973
974+ // eventer goroutine gets watch events and updates values for watchers.
975+ func (conn * Connection ) eventer (events <- chan connWatchEvent ) {
976+ for {
977+ event , ok := <- events
978+ if ! ok {
979+ // The channel is closed.
980+ break
981+ }
982+
983+ if value , ok := conn .watchMap .Load (event .key ); ok {
984+ st := value .(chan watchState )
985+ state := <- st
986+ state .value = event .value
987+ state .init = false
988+ state .ack = false
989+ if state .changed != nil {
990+ close (state .changed )
991+ state .changed = nil
992+ }
993+ st <- state
994+ }
995+ }
996+ }
997+
877998func (conn * Connection ) newFuture (ctx context.Context ) (fut * Future ) {
878999 fut = NewFuture ()
8791000 if conn .rlimit != nil && conn .opts .RLimitAction == RLimitDrop {
@@ -1029,6 +1150,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10291150 return
10301151 }
10311152 shard .bufmut .Unlock ()
1153+
1154+ if req .Async () {
1155+ if fut = conn .fetchFuture (reqid ); fut != nil {
1156+ resp := & Response {
1157+ RequestId : reqid ,
1158+ Code : OkCode ,
1159+ }
1160+ fut .SetResponse (resp )
1161+ conn .markDone (fut )
1162+ }
1163+ }
1164+
10321165 if firstWritten {
10331166 conn .dirtyShard <- shardn
10341167 }
@@ -1233,6 +1366,161 @@ func (conn *Connection) NewStream() (*Stream, error) {
12331366 }, nil
12341367}
12351368
1369+ // watchState is the current state of the watcher. See the idea at p. 70, 105:
1370+ // https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1371+ type watchState struct {
1372+ // value is a current value.
1373+ value interface {}
1374+ // init is true if it is an initial state (no events received).
1375+ init bool
1376+ // ack true if the acknowledge is already sended.
1377+ ack bool
1378+ // cnt is a count of active watchers for the key.
1379+ cnt int
1380+ // changed is a channel for broadcast the value changes.
1381+ changed chan struct {}
1382+ }
1383+
1384+ // connWatcher is an internal implementation of the Watcher interface.
1385+ type connWatcher struct {
1386+ unregister sync.Once
1387+ done chan struct {}
1388+ finished chan struct {}
1389+ }
1390+
1391+ // Unregister unregisters the connection watcher.
1392+ func (w * connWatcher ) Unregister () {
1393+ w .unregister .Do (func () {
1394+ close (w .done )
1395+ })
1396+ <- w .finished
1397+ }
1398+
1399+ // NewWatcher creates a new Watcher object for the connection.
1400+ //
1401+ // After watcher creation, the watcher callback is invoked for the first time.
1402+ // In this case, the callback is triggered whether or not the key has already
1403+ // been broadcast. All subsequent invocations are triggered with
1404+ // box.broadcast() called on the remote host. If a watcher is subscribed for a
1405+ // key that has not been broadcast yet, the callback is triggered only once,
1406+ // after the registration of the watcher.
1407+ //
1408+ // The watcher callbacks are always invoked in a separate goroutine. A watcher
1409+ // callback is never executed in parallel with itself, but they can be executed
1410+ // in parallel to other watchers.
1411+ //
1412+ // If the key is updated while the watcher callback is running, the callback
1413+ // will be invoked again with the latest value as soon as it returns.
1414+ //
1415+ // Watchers survive reconnection. All registered watchers are automatically
1416+ // resubscribed when the connection is reestablished.
1417+ //
1418+ // Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1419+ // watcher’s destruction. In this case, the watcher remains registered. You
1420+ // need to call Unregister() directly.
1421+ //
1422+ // Unregister() guarantees that there will be no the watcher's callback calls
1423+ // after it, but Unregister() call from the callback leads to a deadlock.
1424+ //
1425+ // See:
1426+ // https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1427+ //
1428+ // Since 1.10.0
1429+ func (conn * Connection ) NewWatcher (key string , callback WatchCallback ) (Watcher , error ) {
1430+ required := ProtocolInfo {Features : []ProtocolFeature {WatchersFeature }}
1431+ if err := checkProtocolInfo (required , conn .ServerProtocolInfo ()); err != nil {
1432+ return nil , err
1433+ }
1434+
1435+ var st chan watchState
1436+ // Get or create a shared data for the key.
1437+ if val , ok := conn .watchMap .Load (key ); ! ok {
1438+ st = make (chan watchState , 1 )
1439+ st <- watchState {
1440+ value : nil ,
1441+ init : true ,
1442+ ack : false ,
1443+ cnt : 0 ,
1444+ changed : nil ,
1445+ }
1446+
1447+ if val , ok := conn .watchMap .LoadOrStore (key , st ); ok {
1448+ close (st )
1449+ st = val .(chan watchState )
1450+ }
1451+ } else {
1452+ st = val .(chan watchState )
1453+ }
1454+
1455+ state := <- st
1456+ // Send an initial watch request if needed.
1457+ if state .cnt == 0 {
1458+ if _ , err := conn .Do (newWatchRequest (key )).Get (); err != nil {
1459+ st <- state
1460+ return nil , err
1461+ }
1462+ state .init = true
1463+ state .ack = true
1464+ }
1465+ state .cnt += 1
1466+ st <- state
1467+
1468+ // Start the watcher goroutine.
1469+ done := make (chan struct {})
1470+ finished := make (chan struct {})
1471+
1472+ go func () {
1473+ for {
1474+ state := <- st
1475+ if state .changed == nil {
1476+ state .changed = make (chan struct {})
1477+ }
1478+ st <- state
1479+
1480+ if ! state .init {
1481+ callback (WatchEvent {
1482+ Conn : conn ,
1483+ Key : key ,
1484+ Value : state .value ,
1485+ })
1486+
1487+ // Acknowledge the notification.
1488+ state = <- st
1489+ ack := state .ack
1490+ state .ack = true
1491+ st <- state
1492+
1493+ if ! ack {
1494+ conn .Do (newWatchRequest (key )).Get ()
1495+ // We expect a reconnect and re-subscribe if it fails to
1496+ // send the watch request. So it looks ok do not check a
1497+ // result.
1498+ }
1499+ }
1500+
1501+ select {
1502+ case <- done :
1503+ state := <- st
1504+ state .cnt -= 1
1505+ if state .cnt == 0 {
1506+ // The last one sends IPROTO_UNWATCH.
1507+ conn .Do (newUnwatchRequest (key )).Get ()
1508+ }
1509+ st <- state
1510+
1511+ close (finished )
1512+ return
1513+ case <- state .changed :
1514+ }
1515+ }
1516+ }()
1517+
1518+ return & connWatcher {
1519+ done : done ,
1520+ finished : finished ,
1521+ }, nil
1522+ }
1523+
12361524// checkProtocolInfo checks that expected protocol version is
12371525// and protocol features are supported.
12381526func checkProtocolInfo (expected ProtocolInfo , actual ProtocolInfo ) error {
0 commit comments