Skip to content

Filter events get logs post notification#122

Merged
anxolin merged 1 commit intopushfrom
filter-events-get-logs-post-notification
Apr 18, 2025
Merged

Filter events get logs post notification#122
anxolin merged 1 commit intopushfrom
filter-events-get-logs-post-notification

Conversation

@anxolin
Copy link
Copy Markdown
Contributor

@anxolin anxolin commented Apr 18, 2025

This PR:

Get the addresses of users that are subscribed

Use it later for filtering logs

Get logs of trades

Get logs of events for the settlement contract

filters by subscribed users

Get logs of cancelations

Get logs of events for the settlement contract

filters by subscribed users

Convert logs to notifications

Converts both the cancelation and the trade into a notification

Push notification to queue

Send the notifications to the queue

Summary by CodeRabbit

  • New Features

    • Introduced modular notification producers for handling CMS and trade notifications with improved resilience and lifecycle management.
    • Added persistent state tracking for notifications and indexing using a database-backed repository.
    • Implemented batch processing and sending of multiple notifications per message for Telegram integration.
  • Bug Fixes

    • Enhanced retry and error handling for notification delivery, including capped retries and robust acknowledgment logic.
  • Refactor

    • Unified and improved pagination logic for fetching subscribed accounts and Telegram subscriptions.
    • Updated notification handling to support arrays of notifications throughout the system.
    • Refined Redis enablement logic for more flexible configuration.
  • Tests

    • Added new automated tests for notification queue posting.
  • Documentation

    • Updated README with revised development setup instructions.
  • Chores

    • Upgraded dependencies and TypeScript target version for improved compatibility and performance.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 18, 2025

Walkthrough

This update introduces a modular, database-backed notification producer system with a focus on improved lifecycle management, batch notification handling, and robust retry logic. The notification producer is refactored to use producer classes for different notification types (CMS and trade events), each running concurrently and managed via a standardized Runnable interface. State persistence for indexing is implemented using a new PostgreSQL table and repository. The Telegram consumer is updated to process multiple notifications per message and introduces capped retry logic. Supporting libraries are refactored for batch notification processing, improved pagination, and enhanced Redis enablement logic. Documentation and configuration files are updated accordingly.

Changes

File(s) Change Summary
README.md, package.json, tsconfig.base.json Updated documentation for starting DB container; updated "pg" dependency and added "@types/pg"; changed TypeScript target to ES2020.
apps/notification-producer/db.sql Added notifications_indexer_state table with trigger for updated_at timestamp.
apps/notification-producer/src/main.ts Refactored to use modular producer classes, database-backed state, and graceful shutdown handling.
apps/notification-producer/src/postToQueueTest.test.ts, apps/notification-producer/src/postToQueueTest.ts Added new test for posting to queue; removed old test script.
apps/notification-producer/src/producers/CmsNotificationProducer.ts Added new producer class for CMS notifications with in-memory pending state and queue integration.
apps/notification-producer/src/producers/TradeNotificationProducer.ts Added new producer class for trade/blockchain event notifications with state tracking and notification sending.
apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts Added repository for persisting and retrieving indexer state in PostgreSQL.
apps/notification-producer/src/repositories/NotificationsRepository.ts Added repository for managing queue connections and sending notifications.
apps/notification-producer/src/repositories/SubscriptionsRepository.ts Added repository for fetching and caching subscribed accounts.
apps/notification-producer/src/utils.ts Added utility for running async loops with error handling and stoppable logic.
apps/notification-producer/types.ts Added Runnable interface for standardized producer lifecycle management.
apps/telegram/src/main.ts Refactored to handle multiple notifications per message, added retry logic, and improved message acknowledgment.
libs/cms-api/src/index.ts Refactored to use generic pagination, updated endpoints, and improved error handling for subscription queries.
libs/notifications/src/index.ts Refactored to support batch notification parsing, stringification, and queue sending.
libs/repositories/src/datasources/redis.ts Updated Redis enablement logic to prioritize REDIS_ENABLED env variable.

Sequence Diagram(s)

sequenceDiagram
    participant Main as mainLoop
    participant DB as PostgreSQL
    participant CMS as CmsNotificationProducer
    participant Trade as TradeNotificationProducer
    participant Repo as NotificationsRepository
    participant Queue as Notification Queue

    Main->>DB: Initialize connection pool
    Main->>CMS: Start CMS producer
    Main->>Trade: Start Trade producers (per chain)
    CMS->>Repo: Connect to queue
    CMS->>Queue: Send batch notifications
    Trade->>Repo: Connect to queue
    Trade->>Queue: Send batch notifications
    Trade->>DB: Persist last indexed block state
    Main-->>CMS: On shutdown, stop producer
    Main-->>Trade: On shutdown, stop producers
    Main->>DB: Close connection pool
Loading
sequenceDiagram
    participant Telegram as Telegram Consumer
    participant Queue as Notification Queue

    Telegram->>Queue: Receive message (batch of notifications)
    Telegram->>Telegram: Parse notifications[]
    loop For each notification
        Telegram->>Telegram: Send notification to all subscribers
    end
    Telegram->>Telegram: Track retries
    Telegram->>Queue: Ack/Nack message based on success or retry count
Loading

Possibly related PRs

Suggested reviewers

  • shoom3301
  • alfetopito

Poem

In the warren of code, where the messages hop,
Producers now run, never planning to stop.
With state in the database and queues in the sky,
Notifications are batch-sent as time passes by.
Telegram listens, retries just thrice—
While rabbits refactor, the system feels nice!
🐇✨

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.

📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a50c15 and 53570f9.

⛔ Files ignored due to path filters (1)
  • yarn.lock is excluded by !**/yarn.lock, !**/*.lock
📒 Files selected for processing (18)
  • README.md (1 hunks)
  • apps/notification-producer/db.sql (1 hunks)
  • apps/notification-producer/src/main.ts (1 hunks)
  • apps/notification-producer/src/postToQueueTest.test.ts (1 hunks)
  • apps/notification-producer/src/postToQueueTest.ts (0 hunks)
  • apps/notification-producer/src/producers/CmsNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/producers/TradeNotificationProducer.ts (1 hunks)
  • apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts (1 hunks)
  • apps/notification-producer/src/repositories/NotificationsRepository.ts (1 hunks)
  • apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1 hunks)
  • apps/notification-producer/src/utils.ts (1 hunks)
  • apps/notification-producer/types.ts (1 hunks)
  • apps/telegram/src/main.ts (5 hunks)
  • libs/cms-api/src/index.ts (4 hunks)
  • libs/notifications/src/index.ts (2 hunks)
  • libs/repositories/src/datasources/redis.ts (1 hunks)
  • package.json (2 hunks)
  • tsconfig.base.json (1 hunks)
💤 Files with no reviewable changes (1)
  • apps/notification-producer/src/postToQueueTest.ts
🧰 Additional context used
🧬 Code Graph Analysis (7)
apps/notification-producer/src/postToQueueTest.test.ts (1)
libs/notifications/src/index.ts (3)
  • connectToChannel (47-70)
  • NOTIFICATIONS_QUEUE (4-4)
  • sendNotificationsToQueue (78-82)
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)
libs/cms-api/src/index.ts (1)
  • getAllSubscribedAccounts (143-148)
apps/notification-producer/src/utils.ts (3)
apps/notification-producer/src/producers/CmsNotificationProducer.ts (1)
  • stop (54-57)
apps/notification-producer/src/producers/TradeNotificationProducer.ts (1)
  • stop (82-87)
libs/notifications/src/index.ts (1)
  • sleep (34-36)
apps/notification-producer/src/producers/CmsNotificationProducer.ts (7)
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)
  • NotificationsRepository (10-66)
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)
  • SubscriptionRepository (6-23)
apps/notification-producer/types.ts (1)
  • Runnable (4-14)
libs/notifications/src/index.ts (1)
  • Notification (15-21)
apps/notification-producer/src/utils.ts (1)
  • doForever (3-28)
apps/notification-producer/src/producers/TradeNotificationProducer.ts (1)
  • stop (82-87)
libs/cms-api/src/index.ts (2)
  • getPushNotifications (260-272)
  • CmsPushNotification (16-30)
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)
libs/notifications/src/index.ts (5)
  • ConnectToChannelResponse (42-45)
  • NOTIFICATIONS_QUEUE (4-4)
  • connectToChannel (47-70)
  • Notification (15-21)
  • sendNotificationsToQueue (78-82)
apps/telegram/src/main.ts (2)
libs/cms-api/src/index.ts (1)
  • CmsTelegramSubscription (6-9)
libs/notifications/src/index.ts (2)
  • Notification (15-21)
  • parseNotifications (23-27)
libs/cms-api/src/index.ts (1)
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)
  • getAllSubscribedAccounts (10-22)
🪛 Biome (1.9.4)
apps/notification-producer/src/main.ts

[error] 89-89: void is confusing outside a return type or a type parameter.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

🔇 Additional comments (13)
package.json (2)

58-58: PostgreSQL package update looks good

The update from pg ^8.13.1 to ^8.14.1 is a minor version update that should maintain backward compatibility while incorporating bug fixes and improvements.


82-82: Good addition of type definitions

Adding @types/pg to devDependencies is a good practice for improving type safety when working with the PostgreSQL library.

tsconfig.base.json (1)

11-11:

Details

✅ Verification successful

Verify runtime compatibility with ES2020 target

Updating from ES2015 to ES2020 enables modern JavaScript features like optional chaining, nullish coalescing, and improved Promise handling that will benefit the codebase. However, ensure your runtime environment (Node.js version) fully supports ES2020 features.


🏁 Script executed:

#!/bin/bash
# Check Node.js version in package.json and verify it supports ES2020
if [ -f "package.json" ]; then
  NODE_ENGINE=$(cat package.json | grep '"node":' || echo "Not specified")
  echo "Node.js engine requirement: $NODE_ENGINE"
fi

# Check actual Node.js version in environment
NODE_VERSION=$(node -v)
echo "Current Node.js version: $NODE_VERSION"

# ES2020 is fully supported in Node.js 14+
echo "Note: ES2020 features are fully supported in Node.js 14 and later"

Length of output: 460


ES2020 Target Compatibility Verified

  • package.json does not specify an engines.node field, but the current environment is Node v22.9.0
  • Node 14+ fully supports all ES2020 features, so targeting ES2020 is safe with your runtime

No further changes required.

apps/notification-producer/db.sql (3)

1-8: Well-structured table design for notification indexer state.

The notifications_indexer_state table provides a robust foundation for persisting indexer state with flexible JSON storage. The unique constraint on (key, chainId) ensures proper data integrity for different indexers across multiple chains.


10-17: Good implementation of timestamp management function.

The set_updated_at() trigger function follows PostgreSQL best practices for automatic timestamp updates.


19-22: Proper trigger setup for automatic timestamp updates.

The trigger correctly attaches to the table and executes before updates, ensuring the updated_at column remains accurate.

README.md (1)

36-37: Clear documentation update for the new database dependency.

The added instruction to start the database container is clear and appropriately placed in the sequence before starting the notification producer.

libs/repositories/src/datasources/redis.ts (2)

3-7: Improved Redis enablement logic with proper precedence.

This change adds more control over Redis enablement by prioritizing the explicit REDIS_ENABLED flag over the presence of REDIS_HOST. This allows explicitly disabling Redis even when a host is configured, which is a good practice for configuration management.


9-9: Consistent application of the updated enablement logic.

The export logic correctly uses the refined isRedisEnabled flag.

apps/notification-producer/src/postToQueueTest.test.ts (1)

1-5: Appropriate imports for notification queue testing.

The imports correctly reference the necessary components from the notifications package.

apps/notification-producer/src/producers/TradeNotificationProducer.ts (3)

55-60: Out‑of‑date JSDoc refers to “CMS notification producer”

The comment was copy‑pasted from the CMS producer and is misleading here.

[ suggest_nitpick ]


49-50: pendingNotifications is never populated – dead code

Unlike the CMS producer, this class never calls .set() on pendingNotifications, yet it is cleared (line 193).
Either add the resilience logic or remove the field entirely to avoid confusion.

[ suggest_optional_refactor ]


121-129:

Details

❌ Incorrect review comment

Case‑sensitive owner filter – same pitfall as CMS producer

accounts is lower‑case, while getLogs will return checksum‑cased addresses.
viem accepts an array of addresses for indexed topic filtering, but they must match exactly.
Convert accounts to checksum or, simpler, pass them lower‑cased and set strict: false.


🏁 Script executed:

#!/bin/bash
grep -R "getLogs" -n apps/notification-producer/src/producers/TradeNotificationProducer.ts || true

Length of output: 223


🌐 Web query:

Does viem getLogs support a `strict` boolean option for indexed topic filtering, and how is it used?

💡 Result:

Viem getLogs and the strict Boolean Option for Indexed Topic Filtering

Does getLogs support a strict boolean option?
Yes, Viem's getLogs supports a strict option that controls how logs are filtered based on whether they fully conform to the event's ABI, specifically regarding indexed and non-indexed arguments[1].


How is the strict option used?

By default, getLogs will return all logs that match the filter, even if some logs do not perfectly match the ABI's indexed/non-indexed arguments. In these cases, the resulting log’s args fields may be undefined for arguments that cannot be decoded against the ABI[1].

When you enable strict: true, Viem will only return logs that exactly match the ABI—that is, logs where all indexed and non-indexed arguments can be decoded. This ensures all fields in args are always present and not undefined. The trade-off is that non-conforming logs will be filtered out and not included in the result[1].


Example Usage:

import { parseAbiItem } from 'viem'

const logs = await publicClient.getLogs({
  address: '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
  event: parseAbiItem('event Transfer(address indexed from, address indexed to, uint256 value)'),
  args: {
    from: '0xd8da6bf26964af9d7eed9e03e53415d37aa96045',
    to: '0xa5cc3c03994db5b0d9a5eedd10cabab0813678ac'
  },
  strict: true, // Only logs that fully match the ABI signature
})

With strict: true, the returned logs’ args object will always have defined properties [from, to, value]. Without strict, some or all of them might be undefined if the log doesn't exactly match the ABI[1].


Summary Table

Option Behavior
strict: false Returns all logs that match the filter, but args may have undefined properties.
strict: true Returns only logs that exactly match the ABI; args always fully defined.

Key Takeaway:
Use strict: true with getLogs when you want strong guarantees that every returned log matches your ABI and all argument fields are always present and typed[1].

Citations:


Ignore case‑sensitive owner filter warning – Viem normalizes addresses

Viem automatically lowercases any hex string arguments when building filter topics and its RPC returns topics in lowercase. Decoded log.args.owner values also come back in lowercase, so supplying accounts as a lowercase array works correctly and no changes to checksum‐case or strict are needed.

Likely an incorrect or invalid review comment.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@socket-security
Copy link
Copy Markdown

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedpg-numeric@​1.0.21001006675100
Added@​types/​pg@​8.11.131001007190100
Addedobuf@​1.1.21001007176100
Updatedpg-protocol@​1.7.0 ⏵ 1.8.0100 +110072 +181100
Addedpostgres-range@​1.1.41001009476100
Updatedpg-pool@​3.7.0 ⏵ 3.8.0100 +110010082100
Updatedpg@​8.13.1 ⏵ 8.14.199 +110098 +185100

View full report

@anxolin anxolin requested a review from a team April 18, 2025 15:18
@anxolin anxolin marked this pull request as ready for review April 18, 2025 15:18
@anxolin anxolin changed the base branch from main to push April 18, 2025 15:19
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 17

🔭 Outside diff range comments (2)
libs/cms-api/src/index.ts (1)

163-199: ⚠️ Potential issue

First page is fetched twice – increment page before the 2nd request

getAllPages() retrieves page 0 again inside the while loop because page is only incremented after the repeated call to getPage.
That duplicates the first batch and can lead to an infinite loop if the API always returns pageSize + 1 items.

-  while (subscriptions.length > pageSize) {
-    subscriptions = await getPage({
-      page,
-      pageSize: pageSize + 1,
-    });
+  while (subscriptions.length > pageSize) {
+    page++;                           // ✅ advance page *before* fetching
+    subscriptions = await getPage({
+      page,
+      pageSize: pageSize + 1,
+    });

Same pattern appears in getAllNotifications; consider fixing both for consistency.

apps/telegram/src/main.ts (1)

47-55: ⚠️ Potential issue

Subscription cache never refreshed – timestamp is not updated

After fetching fresh subscriptions you store them, but you forget to record the fetch time.
LAST_SUBSCRIPTION_CHECK is therefore always undefined, causing a fetch on every call and defeating the cache.

     SUBSCRIPTION_CACHE.set(account, subscriptionForAccount);
+    LAST_SUBSCRIPTION_CHECK.set(account, new Date());  // ✅ mark refresh time
🧹 Nitpick comments (10)
apps/notification-producer/types.ts (1)

1-14: Well-designed interface for standardizing component lifecycles

The Runnable interface cleanly defines the contract for components with start/stop lifecycle methods. The comments clearly document the expected behavior that methods should not throw or complete immediately.

Consider adding a method to query the current state of the runnable component, which could help with monitoring and management:

export interface Runnable {
  /**
   * Start the program, this method should not throw or finish.
   */
  start(): Promise<void>;

  /**
   * Stop the program, this method should not throw or finish.
   */
  stop(): Promise<void>;
+
+  /**
+   * Returns the current state of the program.
+   */
+  getStatus(): 'starting' | 'running' | 'stopping' | 'stopped';
}
apps/notification-producer/src/utils.ts (1)

3-28: Robust implementation for continuous background processing

The doForever utility provides a clean pattern for implementing resilient background processes that can gracefully handle errors and shutdown requests. The function correctly implements async/await patterns and has good logging.

Consider these enhancements to make the utility more flexible:

import { sleep } from '@cowprotocol/notifications';

export async function doForever(
  name: string,
  callback: (stop: () => void) => Promise<void>,
  waitTimeMilliseconds: number,
+  options?: {
+    onError?: (error: unknown) => void;
+    errorWaitTimeMilliseconds?: number;
+  }
) {
  // eslint-disable-next-line no-constant-condition
  let running = true;
  while (running) {
    const stop = () => {
      console.log(`[${name}] Stopping...`);
      running = false;
    };

    try {
      await callback(stop);
    } catch (error) {
      console.error(`[${name}] Error `, error);
+      if (options?.onError) {
+        options.onError(error);
+      }
      console.log(
        `[${name}] Reconnecting in ${
-          waitTimeMilliseconds / 1000
+          (options?.errorWaitTimeMilliseconds || waitTimeMilliseconds) / 1000
        }s...`
      );
+      await sleep(options?.errorWaitTimeMilliseconds || waitTimeMilliseconds);
+      continue;
    } finally {
-      await sleep(waitTimeMilliseconds);
+      if (running) {
+        await sleep(waitTimeMilliseconds);
+      }
    }
  }
  console.log(`[${name}] Stopped`);
}

This refactoring allows for:

  1. Custom error handling logic
  2. Different wait times after errors versus successful executions
  3. Avoiding unnecessary waits after the loop is stopped
apps/notification-producer/src/postToQueueTest.test.ts (1)

19-21: Avoid JSON string for the message property.

The message property contains a JSON string with redundant data already present in other properties of the notification.

-        message:
-          '{"title": "my title", "message": "my message", "account": "0x79063d9173C09887d536924E2F6eADbaBAc099f5", "url": "https://www.google.com"}',
+        message: "my message",

If the payload format needs to maintain backward compatibility, consider adding a comment explaining why the message is structured this way.

apps/notification-producer/src/repositories/SubscriptionsRepository.ts (2)

5-6: Consider updating the TODO comment with a more specific action item.

The TODO comment is somewhat vague. It would be helpful to add more specific details about which dependency injection framework will be used and what the "proper cached repository" structure looks like.


25-27: Consider moving the helper function inside the class.

The uniqueLowercase function is only used within the SubscriptionRepository class. Consider moving it inside the class as a private static method for better encapsulation.

-function uniqueLowercase(items: string[]): string[] {
-  return Array.from(new Set(items.map((item) => item.toLowerCase())));
-}

export class SubscriptionRepository {
  private lastCheck: number | null = null;
  private cachedAccounts: string[] | null = null;

+  private static uniqueLowercase(items: string[]): string[] {
+    return Array.from(new Set(items.map((item) => item.toLowerCase())));
+  }

  async getAllSubscribedAccounts(): Promise<string[]> {
    const now = Date.now();
    if (
      !this.cachedAccounts ||
      !this.lastCheck ||
      now - this.lastCheck > CACHE_TIME
    ) {
-      this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
+      this.cachedAccounts = SubscriptionRepository.uniqueLowercase(await getAllSubscribedAccounts());
      this.lastCheck = now;
      return this.cachedAccounts;
    }
    return this.cachedAccounts || [];
  }
}
apps/notification-producer/src/repositories/NotificationsRepository.ts (1)

55-64: Improve error handling in pingConnection method.

The pingConnection method catches all errors without logging them, which might mask specific connection issues and make debugging harder.

async pingConnection(): Promise<boolean> {
  if (!this.connection) return false;

  try {
    const ch = await this.connection.connection.createChannel();
    await ch.close();
    return true;
-  } catch (error) {
+  } catch (error: unknown) {
+    const errorMessage = error instanceof Error ? error.message : String(error);
+    console.debug(`Connection ping failed: ${errorMessage}`);
    return false;
  }
}
libs/notifications/src/index.ts (2)

29-31: Add validation before stringifying notifications.

The stringifyNotifications function doesn't validate the notification objects before stringification. This could lead to unexpected behavior if the objects don't match the expected format.

export function stringifyNotifications(notifications: Notification[]): string {
+  // Ensure notifications is an array
+  if (!Array.isArray(notifications)) {
+    console.warn('Expected notifications to be an array, got:', typeof notifications);
+    return JSON.stringify([]);
+  }
+  
+  // Filter out invalid notifications
+  const validNotifications = notifications.filter(n => 
+    n && typeof n === 'object' && 
+    typeof n.id === 'string' && 
+    typeof n.account === 'string' && 
+    typeof n.title === 'string' && 
+    typeof n.message === 'string'
+  );
+  
+  if (validNotifications.length !== notifications.length) {
+    console.warn(`Filtered out ${notifications.length - validNotifications.length} invalid notifications`);
+  }
+  
+  return JSON.stringify(validNotifications);
-  return JSON.stringify(notifications);
}

78-81: Add validation for empty notification arrays.

The sendNotificationsToQueue function will send empty arrays to the queue. Consider adding a check to avoid unnecessary queue operations when there are no notifications to send.

export function sendNotificationsToQueue(params: SendToQueueParams) {
  const { channel, queue, notifications } = params;
+  // Don't send empty notification arrays
+  if (!notifications || notifications.length === 0) {
+    return;
+  }
  const message = stringifyNotifications(notifications);
  channel.sendToQueue(queue, Buffer.from(message));
}
apps/notification-producer/src/producers/TradeNotificationProducer.ts (1)

161-164: title: 'Trade' reused for OrderInvalidated

Notifications for cancelled orders are labelled “Trade”, which is misleading.

-  title: 'Trade',
+  title: 'Order invalidated',
apps/telegram/src/main.ts (1)

58-67: Guard clause could drop valid empty batches

parseNewMessage treats an empty array ([]) as an error because isNotificationArray([]) is true but the calling code later loops over the array.
Consider returning an empty array and letting onNewMessage acknowledge it immediately, rather than triggering retries for a benign case.

🛑 Comments failed to post (17)
apps/notification-producer/src/postToQueueTest.test.ts (1)

7-26: 🛠️ Refactor suggestion

Test lacks assertions and connection cleanup.

While the test exercises the notification queue functionality, it doesn't verify the operation succeeded or properly clean up resources.

-it('Post to queue', async () => {
+it('Post to queue', async () => {
+  let connection;
+  let channel;
+  
+  try {
-  const { channel } = await connectToChannel({
+    ({ connection, channel } = await connectToChannel({
     channel: NOTIFICATIONS_QUEUE,
-  });
+    }));

   sendNotificationsToQueue({
     channel,
     queue: NOTIFICATIONS_QUEUE,
     notifications: [
       {
         id: '1',
         title: 'title',
         message:
           '{"title": "my title", "message": "my message", "account": "0x79063d9173C09887d536924E2F6eADbaBAc099f5", "url": "https://www.google.com"}',
         account: '0x79063d9173C09887d536924E2F6eADbaBAc099f5',
         url: 'url',
       },
     ],
   });
+    
+    // Add assertion to verify queue operation
+    expect(channel.sendToQueue).toHaveBeenCalledWith(
+      NOTIFICATIONS_QUEUE,
+      expect.any(Buffer)
+    );
+  } finally {
+    // Clean up resources
+    if (channel) await channel.close();
+    if (connection) await connection.close();
+  }
 });

Also, consider mocking the RabbitMQ channel to avoid external dependencies during unit testing:

// At the top of the file
jest.mock('@cowprotocol/notifications', () => ({
  NOTIFICATIONS_QUEUE: 'notifications',
  connectToChannel: jest.fn().mockResolvedValue({
    connection: { close: jest.fn() },
    channel: { 
      sendToQueue: jest.fn(),
      close: jest.fn()
    }
  }),
  sendNotificationsToQueue: jest.fn(),
}));
apps/notification-producer/src/repositories/SubscriptionsRepository.ts (1)

10-22: 🛠️ Refactor suggestion

Add error handling for the external API call.

The getAllSubscribedAccounts method doesn't handle potential errors from the external API call. If the API call fails, the error will propagate and potentially disrupt the application flow.

async getAllSubscribedAccounts(): Promise<string[]> {
  const now = Date.now();
  if (
    !this.cachedAccounts ||
    !this.lastCheck ||
    now - this.lastCheck > CACHE_TIME
  ) {
-    this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
-    this.lastCheck = now;
-    return this.cachedAccounts;
+    try {
+      this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
+      this.lastCheck = now;
+      return this.cachedAccounts;
+    } catch (error) {
+      console.error('Failed to fetch subscribed accounts:', error);
+      // Return cached accounts if available, otherwise empty array
+      return this.cachedAccounts || [];
+    }
  }
  return this.cachedAccounts || [];
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  async getAllSubscribedAccounts(): Promise<string[]> {
    const now = Date.now();
    if (
      !this.cachedAccounts ||
      !this.lastCheck ||
      now - this.lastCheck > CACHE_TIME
    ) {
      try {
        this.cachedAccounts = uniqueLowercase(await getAllSubscribedAccounts());
        this.lastCheck = now;
        return this.cachedAccounts;
      } catch (error) {
        console.error('Failed to fetch subscribed accounts:', error);
        // Return cached accounts if available, otherwise empty array
        return this.cachedAccounts || [];
      }
    }
    return this.cachedAccounts || [];
  }
apps/notification-producer/src/repositories/NotificationsRepository.ts (2)

14-29: 🛠️ Refactor suggestion

Fix potential event listener memory leak.

The connect method adds a new 'close' event listener each time it's called without removing previous listeners. This could lead to multiple duplicate handlers if the method is called repeatedly after disconnections.

async connect(): Promise<ConnectToChannelResponse> {
  if (!this.connection || !this.pingConnection()) {
+    // Remove any existing listeners if the connection exists
+    if (this.connection) {
+      this.connection.connection.removeAllListeners('close');
+    }
    
    // Connect to the queue
    this.connection = await connectToChannel({
      channel: this.queueName,
    });

    // Watch for connection close
    this.connection.connection.on('close', () => this.close());

    return this.connection;
  }

  // Return connection
  return this.connection;
}

41-49: 🛠️ Refactor suggestion

Add error handling for queue operations.

The sendNotifications method doesn't handle potential errors when sending notifications to the queue. If the operation fails, there's no retry mechanism or error logging.

async sendNotifications(notifications: Notification[]) {
-  const connection = await this.connect();
-
-  await sendNotificationsToQueue({
-    channel: connection.channel,
-    queue: this.queueName,
-    notifications,
-  });
+  try {
+    const connection = await this.connect();
+
+    await sendNotificationsToQueue({
+      channel: connection.channel,
+      queue: this.queueName,
+      notifications,
+    });
+  } catch (error) {
+    console.error('Failed to send notifications to queue:', error);
+    throw error; // Re-throw or handle based on your error handling strategy
+  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  async sendNotifications(notifications: Notification[]) {
    try {
      const connection = await this.connect();

      await sendNotificationsToQueue({
        channel: connection.channel,
        queue: this.queueName,
        notifications,
      });
    } catch (error) {
      console.error('Failed to send notifications to queue:', error);
      throw error; // Re-throw or handle based on your error handling strategy
    }
  }
apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts (3)

38-47: 🛠️ Refactor suggestion

Handle null chainId values in the upsert method.

The chainId parameter can be undefined, but there's no explicit handling for this case in the SQL query. This could lead to inserting undefined or invalid values.

async upsert<T>(key: string, state: T, chainId?: number): Promise<void> {
  const query = `
    INSERT INTO notifications_indexer_state (key, chainId, state)
    VALUES ($1, $2, $3)
    ON CONFLICT (key, chainId) 
    DO UPDATE SET state = $3
  `;

-  await this.db.query(query, [key, chainId, state]);
+  // Ensure chainId is null when undefined
+  await this.db.query(query, [key, chainId ?? null, state]);
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  async upsert<T>(key: string, state: T, chainId?: number): Promise<void> {
    const query = `
      INSERT INTO notifications_indexer_state (key, chainId, state)
      VALUES ($1, $2, $3)
      ON CONFLICT (key, chainId) 
      DO UPDATE SET state = $3
    `;

    // Ensure chainId is null when undefined
    await this.db.query(query, [key, chainId ?? null, state]);
  }

18-33: ⚠️ Potential issue

Fix SQL query to match return type.

The SQL query only selects the state column, but the return type is NotificationsIndexerState<T>. The returned object won't have all the expected properties (key, chainId, created_at, updated_at).

async get<T>(
  key: string,
  chainId?: SupportedChainId
): Promise<NotificationsIndexerState<T> | null> {
  const query = `
-    SELECT state 
+    SELECT key, chainId, state, created_at, updated_at
    FROM notifications_indexer_state 
    WHERE key = $1 
    ${chainId !== undefined ? 'AND chainId = $2' : ''}
    LIMIT 1
  `;
  const params = chainId !== undefined ? [key, chainId] : [key];

  const result = await this.db.query(query, params);
  return result.rows[0] || null;
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  async get<T>(
    key: string,
    chainId?: SupportedChainId
  ): Promise<NotificationsIndexerState<T> | null> {
    const query = `
      SELECT key, chainId, state, created_at, updated_at
      FROM notifications_indexer_state 
      WHERE key = $1 
      ${chainId !== undefined ? 'AND chainId = $2' : ''}
      LIMIT 1
    `;
    const params = chainId !== undefined ? [key, chainId] : [key];

    const result = await this.db.query(query, params);
    return result.rows[0] || null;
  }

35-47: 🛠️ Refactor suggestion

Add serialization of the state object.

The state parameter might be a complex object that needs to be serialized to JSON before storing in the database. Without explicit serialization, PostgreSQL might not store it correctly.

async upsert<T>(key: string, state: T, chainId?: number): Promise<void> {
  const query = `
    INSERT INTO notifications_indexer_state (key, chainId, state)
    VALUES ($1, $2, $3)
    ON CONFLICT (key, chainId) 
    DO UPDATE SET state = $3
  `;

-  await this.db.query(query, [key, chainId, state]);
+  // Serialize state to JSON
+  await this.db.query(query, [key, chainId ?? null, JSON.stringify(state)]);
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  /**
   * Update or insert indexer state
   */
  async upsert<T>(key: string, state: T, chainId?: number): Promise<void> {
    const query = `
      INSERT INTO notifications_indexer_state (key, chainId, state)
      VALUES ($1, $2, $3)
      ON CONFLICT (key, chainId) 
      DO UPDATE SET state = $3
    `;

    // Serialize state to JSON
    await this.db.query(query, [key, chainId ?? null, JSON.stringify(state)]);
  }
libs/notifications/src/index.ts (1)

23-27: 🛠️ Refactor suggestion

Add error handling for JSON parsing.

The parseNotifications function doesn't handle potential parsing errors. If the JSON is invalid, it will throw an error that might disrupt the application flow.

export function parseNotifications(
  notificationsString: string
): Notification[] {
-  return JSON.parse(notificationsString);
+  try {
+    return JSON.parse(notificationsString);
+  } catch (error) {
+    console.error('Failed to parse notifications:', error);
+    return [];
+  }
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

export function parseNotifications(
  notificationsString: string
): Notification[] {
  try {
    return JSON.parse(notificationsString);
  } catch (error) {
    console.error('Failed to parse notifications:', error);
    return [];
  }
}
apps/notification-producer/src/producers/CmsNotificationProducer.ts (2)

60-67: ⚠️ Potential issue

Address‑matching is case‑sensitive – risk of silently dropping legitimate notifications

accounts is already normalized to lower‑case by SubscriptionRepository, however the account field coming from the CMS is compared verbatim.
If the CMS happens to return mixed‑case addresses (0xAbC…) the strict includes check will fail, causing valid notifications to be ignored.

Suggestion:

-({ account }) => accounts.includes(account)
+({ account }) => accounts.includes(account.toLowerCase())

82-93: 🛠️ Refactor suggestion

⚠️ Potential issue

sendNotifications is async but its result is not awaited – notifications may be lost & errors unhandled

connect() is awaited, but the actual publish operation is fired‑and‑forgotten.
If the Promise rejects (network blip, queue unavailable, …) the exception will be unhandled and pendingNotifications will still be cleared, losing those messages.

-// Post notifications to queue
-this.props.notificationsRepository.sendNotifications(pushNotifications);
-this.pendingNotifications.clear();
+try {
+  // Post notifications to queue
+  await this.props.notificationsRepository.sendNotifications(
+    pushNotifications
+  );
+  // Clear only after a successful publish
+  this.pendingNotifications.clear();
+} catch (err) {
+  console.error('[CmsNotificationProducer] Failed to send', err);
+  // keep notifications in memory for the next retry
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

    // Save notifications in-memory, so they are not lost if there's an issue with the queue
    pushNotifications.forEach((notification) =>
      this.pendingNotifications.set(notification.id, notification)
    );

    // Connect
    await this.props.notificationsRepository.connect();

    try {
      // Post notifications to queue
      await this.props.notificationsRepository.sendNotifications(
        pushNotifications
      );
      // Clear only after a successful publish
      this.pendingNotifications.clear();
    } catch (err) {
      console.error('[CmsNotificationProducer] Failed to send', err);
      // keep notifications in memory for the next retry
    }
  }
apps/notification-producer/src/producers/TradeNotificationProducer.ts (3)

145-153: 🛠️ Refactor suggestion

User‑facing message shows raw wei amounts and token addresses

log.args.sellAmount / buyAmount are uint256 in wei and the token fields are raw addresses.
End‑users will receive unreadable messages like
“Trade 1000000000000000000 0x6B… for 2000000000000000000 0xA0…”.

Consider resolving token symbols/decimals and formatting with formatUnits.


196-204: ⚠️ Potential issue

Database state update is asynchronous but not awaited

upsert returns a Promise; ignoring it risks losing the last processed block on crash.

-await notificationsIndexerStateRepository.upsert<TradeNotificationProducerState>(
+await notificationsIndexerStateRepository.upsert<TradeNotificationProducerState>(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

    await notificationsIndexerStateRepository.upsert<TradeNotificationProducerState>(
      PRODUCER_NAME,
      {
        lastBlock: lastBlock.number.toString(),
        lastBlockTimestamp: lastBlock.timestamp.toString(),
        lastBlockHash: lastBlock.hash,
      },
      chainId
    );

188-193: ⚠️ Potential issue

sendNotifications is not awaited – same reliability issue

Repeat of the CMS producer issue.

-await this.props.notificationsRepository.connect();
-// Post notifications to queue
-this.props.notificationsRepository.sendNotifications(notifications);
+await this.props.notificationsRepository.connect();
+await this.props.notificationsRepository.sendNotifications(notifications);

Committable suggestion skipped: line range outside the PR's diff.

apps/notification-producer/src/main.ts (2)

60-70: 🛠️ Refactor suggestion

Producer auto‑restart logic drops the replacement Promise – shutdown & monitoring issues

Inside the .then / .catch handlers you call producer.start() again but do not return the new promise, so promises only tracks the first run.
producersPromise may resolve even though the restarted producer is still running, breaking graceful shutdown logic.

Refactor to a small helper that wraps each producer with a self‑restarting loop and always returns the current promise.


76-82: ⚠️ Potential issue

Cannot catch SIGKILL – use SIGINT / SIGTERM instead

Node.js cannot intercept SIGKILL; the handler will never execute, so resources won’t be cleaned up.

-process.on('SIGKILL', () => {
+process.on('SIGTERM', () => {
+  gracefulShutdown(pool, producers, producersPromise).catch(…);
+});
+process.on('SIGINT', () => {
   gracefulShutdown(pool, producers, producersPromise).catch((error) => {
     …
   });
-});
+});
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

  process.on('SIGTERM', () => {
    gracefulShutdown(pool, producers, producersPromise).catch((error) => {
      console.error('Error during shutdown', error);
      process.exit(1);
    });
  });

  process.on('SIGINT', () => {
    gracefulShutdown(pool, producers, producersPromise).catch((error) => {
      console.error('Error during shutdown', error);
      process.exit(1);
    });
  });
libs/cms-api/src/index.ts (1)

210-217: 🛠️ Refactor suggestion

Potentially over‑long URL – switch to query/body param

/accounts/${accounts.join(',')}/subscriptions/telegram can exceed typical URL‐length limits when accounts is large (easily >2 kB).
Consider POSTing the list in the body or passing it as a query string (accounts=0x…&accounts=0x…) to avoid truncated requests.

apps/telegram/src/main.ts (1)

127-134: ⚠️ Potential issue

Retry logic ineffective – message is nack‑ed without requeue

channel.nack(msg, false, false) discards the message, so it will never be delivered again despite incrementing MESSAGE_RETRIES.
Pass true as the third argument to requeue the message and allow the retry loop to work.

-      channel.nack(msg, false, false);
+      channel.nack(msg, false, true);   // requeue = true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

      // Increment retry count and NACK the message
      const newRetryCount = retryCount + 1;
      MESSAGE_RETRIES.set(messageId, newRetryCount);
      console.warn(
        `[telegram:main] Retry attempt ${newRetryCount}/${MAX_RETRIES} for message ${messageId}`
      );
      channel.nack(msg, false, true);   // requeue = true
    }

import { getAddress, parseAbi } from 'viem';

const EVENTS = parseAbi([
'event OrderInvalidated(address indexed owner, bytes orderUid)',
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this is tracked in the backend DB as well.
There's a table called onchain_order_invalidations.

image

So in theory we don't need to watch for those events.

@anxolin anxolin merged commit c3a6fe1 into push Apr 18, 2025
7 checks passed
@anxolin anxolin deleted the filter-events-get-logs-post-notification branch April 18, 2025 16:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants