diff --git a/apps/notification-producer/src/producers/TradeNotificationProducer.ts b/apps/notification-producer/src/producers/TradeNotificationProducer.ts index 8fa5aa6c..db5ef2db 100644 --- a/apps/notification-producer/src/producers/TradeNotificationProducer.ts +++ b/apps/notification-producer/src/producers/TradeNotificationProducer.ts @@ -1,4 +1,7 @@ -import { SupportedChainId } from '@cowprotocol/cow-sdk'; +import { + COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS, + SupportedChainId, +} from '@cowprotocol/cow-sdk'; import { Notification } from '@cowprotocol/notifications'; import { viemClients } from '@cowprotocol/repositories'; import { NotificationsRepository } from '../repositories/NotificationsRepository'; @@ -7,10 +10,25 @@ import { doForever } from '../utils'; import { Runnable } from '../../types'; import { SubscriptionRepository } from '../repositories/SubscriptionsRepository'; import { NotificationsIndexerStateRepository } from '../repositories/NotificationsIndexerStateRepository'; +import { getAddress, parseAbi } from 'viem'; + +const EVENTS = parseAbi([ + 'event OrderInvalidated(address indexed owner, bytes orderUid)', + 'event Trade(address indexed owner, address sellToken, address buyToken, uint256 sellAmount, uint256 buyAmount, uint256 feeAmount, bytes orderUid)', +]); const WAIT_TIME = 30000; const PRODUCER_NAME = 'trade_notification_producer'; +// TODO: Get from SDK +const CHAIN_NAME_MAP: Record = { + [SupportedChainId.MAINNET]: 'mainnet', + [SupportedChainId.ARBITRUM_ONE]: 'arb1', + [SupportedChainId.GNOSIS_CHAIN]: 'gc', + [SupportedChainId.BASE]: 'base', + [SupportedChainId.SEPOLIA]: 'sepolia', +}; + export type TradeNotificationProducerProps = { chainId: SupportedChainId; notificationsRepository: NotificationsRepository; @@ -20,6 +38,7 @@ export type TradeNotificationProducerProps = { export class TradeNotificationProducer implements Runnable { isStopping = false; + prefix: string; /** * This in-memory state just adds some resilience in case there's an error posting the message. @@ -29,7 +48,9 @@ export class TradeNotificationProducer implements Runnable { */ pendingNotifications = new Map(); - constructor(private props: TradeNotificationProducerProps) {} + constructor(private props: TradeNotificationProducerProps) { + this.prefix = '[TradeNotificationProducer:' + this.props.chainId + ']'; + } /** * Main loop: Run the CMS notification producer. This method runs indefinitely, @@ -55,12 +76,12 @@ export class TradeNotificationProducer implements Runnable { WAIT_TIME ); - console.log('TradeNotificationProducer:' + this.props.chainId, 'stopped'); + console.log(`${this.prefix} stopped`); } async stop(): Promise { console.log( - 'Stopping TradeNotificationProducer for chainId=' + this.props.chainId + `${this.prefix} Stopping TradeNotificationProducer for chainId=${this.props.chainId}` ); this.isStopping = true; } @@ -73,15 +94,84 @@ export class TradeNotificationProducer implements Runnable { } = this.props; // Get last indexed block - const stateRegistry = await notificationsIndexerStateRepository.get( - PRODUCER_NAME, - chainId - ); + const stateRegistry = + await notificationsIndexerStateRepository.get( + PRODUCER_NAME, + chainId + ); - console.log('stateRegistry', stateRegistry); + const client = viemClients[chainId]; // Get last block - const lastBlock = await viemClients[chainId].getBlock(); + const lastBlock = await client.getBlock(); + + // Get trade events from block to last block + const fromBlock = stateRegistry?.state + ? BigInt(stateRegistry.state.lastBlock) + 1n + : lastBlock.number; + + if (fromBlock > lastBlock.number) { + console.log(`${this.prefix} No new blocks to index`); + return; + } + + const accounts = await subscriptionRepository.getAllSubscribedAccounts(); + console.log(`${this.prefix} Accounts: ${accounts.join(', ')}`); + + const logs = await client.getLogs({ + events: EVENTS, + fromBlock, + toBlock: lastBlock.number, + address: getAddress(COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS[chainId]), + args: { + owner: accounts, + } as any, + }); + console.log(`${this.prefix} Found ${logs.length} events`); + + const notifications = logs.reduce((acc, log) => { + if (!log.args.owner) { + // TODO: Filter for relevant ones (or better, filter in the getLogs already) + // No owner + return acc; + } + + const url = log.args.orderUid + ? getExplorerUrl(chainId, log.args.orderUid) + : undefined; + + switch (log.eventName) { + case 'Trade': { + const message = `Trade ${log.args.sellAmount} ${log.args.sellToken} for ${log.args.buyAmount} ${log.args.buyToken}`; + console.log(`${this.prefix} New ${message}`); + acc.push({ + id: 'Trade-' + log.transactionHash + '-' + log.logIndex, + account: log.args.owner, + title: 'Trade', + message, + url, + }); + break; + } + + case 'OrderInvalidated': { + const message = 'Order invalidated ' + log.args.orderUid; + console.log(`${this.prefix} ${message}`); + acc.push({ + id: 'Trade-' + log.transactionHash + '-' + log.logIndex, + account: log.args.owner, + title: 'Trade', + message, + url, + }); + break; + } + default: + console.log(`${this.prefix} Unknown event ${log}`); + break; + } + return acc; + }, []); // TODO: // Get trade events from block to last block @@ -93,15 +183,17 @@ export class TradeNotificationProducer implements Runnable { // Send notifications // TODO: - // const accounts = await subscriptionRepository.getAllSubscribedAccounts(); + console.log(`${this.prefix} Notifications:`, notifications); + + // Connect + await this.props.notificationsRepository.connect(); - // console.log( - // 'For now, nothing to do. I plan to fetch trades for: ' + - // accounts.join(', ') - // ); + // Post notifications to queue + this.props.notificationsRepository.sendNotifications(notifications); + this.pendingNotifications.clear(); // Update state - notificationsIndexerStateRepository.upsert( + notificationsIndexerStateRepository.upsert( PRODUCER_NAME, { lastBlock: lastBlock.number.toString(), @@ -112,3 +204,20 @@ export class TradeNotificationProducer implements Runnable { ); } } + +interface TradeNotificationProducerState { + lastBlock: string; + lastBlockTimestamp: string; + lastBlockHash: string; +} + +function getExplorerUrl(chainId: SupportedChainId, orderUid: string) { + const baseUrl = getExplorerBaseUrl(chainId); + return `${baseUrl}/orders/${orderUid}`; +} + +function getExplorerBaseUrl(chainId: SupportedChainId) { + const suffix = + chainId === SupportedChainId.MAINNET ? '' : `/${CHAIN_NAME_MAP[chainId]}`; + return `https://explorer.cow.fi${suffix}`; +} diff --git a/apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts b/apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts index 9e3475ee..697c80c9 100644 --- a/apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts +++ b/apps/notification-producer/src/repositories/NotificationsIndexerStateRepository.ts @@ -35,7 +35,7 @@ export class NotificationsIndexerStateRepository { /** * Update or insert indexer state */ - async upsert(key: string, state: unknown, chainId?: number): Promise { + async upsert(key: string, state: T, chainId?: number): Promise { const query = ` INSERT INTO notifications_indexer_state (key, chainId, state) VALUES ($1, $2, $3) diff --git a/libs/cms-api/src/index.ts b/libs/cms-api/src/index.ts index b7733759..7e707d5d 100644 --- a/libs/cms-api/src/index.ts +++ b/libs/cms-api/src/index.ts @@ -9,6 +9,10 @@ export type CmsTelegramSubscription = { }; export type CmsTelegramSubscriptionsResponse = Schemas['TelegramSubscriptionResponse']; + +export type CmsTelegramSubscriptions = + Schemas['TelegramSubscriptionResponse']['data']; + export type CmsPushNotification = { id: number; account: string; @@ -203,7 +207,7 @@ async function getTelegramSubscriptionsForAccounts({ CmsTelegramSubscription[] > { const { data, error, response } = await cmsClient.GET( - `/tg-subscriptions?accounts=${accounts.join(',')}`, + `/accounts/${accounts.join(',')}/subscriptions/telegram`, { // Pagination 'pagination[page]': page, @@ -225,11 +229,14 @@ async function getSubscribedAccounts({ page = 0, pageSize = PAGE_SIZE, }: PaginationParam): Promise { - const { data, error, response } = await cmsClient.GET(`/tg-subscriptions`, { - // Pagination - 'pagination[page]': page, - 'pagination[pageSize]': pageSize, - }); + const { data, error, response } = await cmsClient.GET( + `/telegram-subscriptions`, + { + // Pagination + 'pagination[page]': page, + 'pagination[pageSize]': pageSize, + } + ); if (error) { console.error( @@ -239,7 +246,15 @@ async function getSubscribedAccounts({ throw error; } - return (data as CmsTelegramSubscription[]).map(({ account }) => account); + const subscriptions = data.data as CmsTelegramSubscriptions[]; + + return subscriptions.reduce((acc, subscription) => { + const account = subscription?.attributes?.account; + if (account) { + acc.push(account); + } + return acc; + }, []); } export async function getPushNotifications(): Promise { diff --git a/tsconfig.base.json b/tsconfig.base.json index 0020d49b..516fb031 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -8,7 +8,7 @@ "emitDecoratorMetadata": true, "experimentalDecorators": true, "importHelpers": true, - "target": "es2015", + "target": "ES2020", "module": "esnext", "lib": ["es2020", "dom"], "skipLibCheck": true,