@@ -2,6 +2,7 @@ package bizangela
22
33import (
44 "context"
5+ "errors"
56 "net/url"
67 "strconv"
78 "time"
@@ -12,9 +13,11 @@ import (
1213 "github.com/tuihub/librarian/app/sephirah/internal/model/converter"
1314 "github.com/tuihub/librarian/app/sephirah/internal/model/modelangela"
1415 "github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura"
16+ "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
1517 "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
1618 "github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod"
1719 "github.com/tuihub/librarian/internal/lib/libapp"
20+ "github.com/tuihub/librarian/internal/lib/libcache"
1821 "github.com/tuihub/librarian/internal/lib/libmq"
1922 "github.com/tuihub/librarian/internal/model"
2023 "github.com/tuihub/librarian/internal/model/modelfeed"
@@ -24,6 +27,7 @@ import (
2427 librarian "github.com/tuihub/protos/pkg/librarian/v1"
2528
2629 "github.com/google/wire"
30+ "golang.org/x/exp/slices"
2731)
2832
2933var ProviderSet = wire .NewSet (
@@ -33,6 +37,11 @@ var ProviderSet = wire.NewSet(
3337 NewPullSteamAccountAppRelationTopic ,
3438 NewPullSteamAppTopic ,
3539 NewPullFeedTopic ,
40+ NewNotifyRouterTopic ,
41+ NewNotifyPushTopic ,
42+ NewFeedToNotifyFlowMap ,
43+ NewNotifyFlowCache ,
44+ NewNotifyTargetCache ,
3645)
3746
3847type Angela struct {
@@ -71,6 +80,8 @@ func NewAngela(
7180 pullSteamAccountAppRelation * libmq.Topic [modelangela.PullSteamAccountAppRelation ],
7281 pullSteamApp * libmq.Topic [modelangela.PullSteamApp ],
7382 pullFeed * libmq.Topic [modelyesod.PullFeed ],
83+ notifyRouter * libmq.Topic [modelangela.NotifyRouter ],
84+ notifyPush * libmq.Topic [modelangela.NotifyPush ],
7485) (* Angela , error ) {
7586 if err := mq .RegisterTopic (pullAccount ); err != nil {
7687 return nil , err
@@ -84,6 +95,12 @@ func NewAngela(
8495 if err := mq .RegisterTopic (pullFeed ); err != nil {
8596 return nil , err
8697 }
98+ if err := mq .RegisterTopic (notifyRouter ); err != nil {
99+ return nil , err
100+ }
101+ if err := mq .RegisterTopic (notifyPush ); err != nil {
102+ return nil , err
103+ }
87104 return & Angela {
88105 mq : mq ,
89106 }, nil
@@ -279,8 +296,9 @@ func NewPullSteamAppTopic(
279296 )
280297}
281298
282- func NewPullFeedTopic (
299+ func NewPullFeedTopic ( //nolint:gocognit // TODO
283300 a * AngelaBase ,
301+ notify * libmq.Topic [modelangela.NotifyRouter ],
284302) * libmq.Topic [modelyesod.PullFeed ] {
285303 return libmq .NewTopic [modelyesod.PullFeed ](
286304 "PullFeed" ,
@@ -321,7 +339,89 @@ func NewPullFeedTopic(
321339 item .PublishedParsed = & t
322340 }
323341 }
324- return a .y .UpsertFeedItems (ctx , feed .Items , feed .ID )
342+ newItemGUIDs , err := a .y .UpsertFeedItems (ctx , feed .Items , feed .ID )
343+ if err != nil {
344+ return err
345+ }
346+ newItems := make ([]* modelfeed.Item , 0 , len (newItemGUIDs ))
347+ for _ , item := range feed .Items {
348+ if slices .Contains (newItemGUIDs , item .GUID ) {
349+ newItems = append (newItems , item )
350+ }
351+ }
352+ err = notify .Publish (ctx , modelangela.NotifyRouter {
353+ FeedID : feed .ID ,
354+ Messages : newItems ,
355+ })
356+ if err != nil {
357+ return err
358+ }
359+ return nil
360+ },
361+ )
362+ }
363+
364+ func NewNotifyRouterTopic (
365+ a * AngelaBase ,
366+ flowMap * libcache.Map [model.InternalID , modelnetzach.NotifyFlow ],
367+ feedToFlowMap * libcache.Map [model.InternalID , modelangela.FeedToNotifyFlowValue ],
368+ push * libmq.Topic [modelangela.NotifyPush ],
369+ ) * libmq.Topic [modelangela.NotifyRouter ] {
370+ return libmq .NewTopic [modelangela.NotifyRouter ](
371+ "NotifyRouter" ,
372+ func (ctx context.Context , r * modelangela.NotifyRouter ) error {
373+ flowIDs , err := feedToFlowMap .GetWithFallBack (ctx , r .FeedID , nil )
374+ if err != nil {
375+ return err
376+ }
377+ if flowIDs == nil {
378+ return errors .New ("nil result from feedToFlowMap" )
379+ }
380+ for _ , flowID := range * flowIDs {
381+ var flow * modelnetzach.NotifyFlow
382+ flow , err = flowMap .GetWithFallBack (ctx , flowID , nil )
383+ if err != nil {
384+ return err
385+ }
386+ for _ , target := range flow .Targets {
387+ if target == nil {
388+ continue
389+ }
390+ err = push .Publish (ctx , modelangela.NotifyPush {
391+ Target : * target ,
392+ Messages : r .Messages ,
393+ })
394+ if err != nil {
395+ return err
396+ }
397+ }
398+ }
399+ return nil
400+ },
401+ )
402+ }
403+
404+ func NewNotifyPushTopic (
405+ a * AngelaBase ,
406+ targetMap * libcache.Map [model.InternalID , modelnetzach.NotifyTarget ],
407+ ) * libmq.Topic [modelangela.NotifyPush ] {
408+ return libmq .NewTopic [modelangela.NotifyPush ](
409+ "NotifyPush" ,
410+ func (ctx context.Context , p * modelangela.NotifyPush ) error {
411+ target , err := targetMap .GetWithFallBack (ctx , p .Target .TargetID , nil )
412+ if err != nil {
413+ return err
414+ }
415+ _ , err = a .porter .PushFeedItems (ctx , & porter.PushFeedItemsRequest {
416+ Destination : converter .ToPBFeedDestination (target .Type ),
417+ ChannelId : p .Target .ChannelID ,
418+ Items : converter .ToPBFeedItemList (p .Messages ),
419+ Token : target .Token ,
420+ })
421+ if err != nil {
422+ return err
423+ }
424+ return nil
325425 },
326426 )
327427}
0 commit comments