Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 125 additions & 16 deletions apps/notification-producer/src/producers/TradeNotificationProducer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { SupportedChainId } from '@cowprotocol/cow-sdk';
import {
COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS,
SupportedChainId,
} from '@cowprotocol/cow-sdk';
import { Notification } from '@cowprotocol/notifications';
import { viemClients } from '@cowprotocol/repositories';
import { NotificationsRepository } from '../repositories/NotificationsRepository';
Expand All @@ -7,10 +10,25 @@ import { doForever } from '../utils';
import { Runnable } from '../../types';
import { SubscriptionRepository } from '../repositories/SubscriptionsRepository';
import { NotificationsIndexerStateRepository } from '../repositories/NotificationsIndexerStateRepository';
import { getAddress, parseAbi } from 'viem';

const EVENTS = parseAbi([
'event OrderInvalidated(address indexed owner, bytes orderUid)',
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this is tracked in the backend DB as well.
There's a table called onchain_order_invalidations.

image

So in theory we don't need to watch for those events.

'event Trade(address indexed owner, address sellToken, address buyToken, uint256 sellAmount, uint256 buyAmount, uint256 feeAmount, bytes orderUid)',
]);

const WAIT_TIME = 30000;
const PRODUCER_NAME = 'trade_notification_producer';

// TODO: Get from SDK
const CHAIN_NAME_MAP: Record<SupportedChainId, string> = {
[SupportedChainId.MAINNET]: 'mainnet',
[SupportedChainId.ARBITRUM_ONE]: 'arb1',
[SupportedChainId.GNOSIS_CHAIN]: 'gc',
[SupportedChainId.BASE]: 'base',
[SupportedChainId.SEPOLIA]: 'sepolia',
};

export type TradeNotificationProducerProps = {
chainId: SupportedChainId;
notificationsRepository: NotificationsRepository;
Expand All @@ -20,6 +38,7 @@ export type TradeNotificationProducerProps = {

export class TradeNotificationProducer implements Runnable {
isStopping = false;
prefix: string;

/**
* This in-memory state just adds some resilience in case there's an error posting the message.
Expand All @@ -29,7 +48,9 @@ export class TradeNotificationProducer implements Runnable {
*/
pendingNotifications = new Map<string, Notification>();

constructor(private props: TradeNotificationProducerProps) {}
constructor(private props: TradeNotificationProducerProps) {
this.prefix = '[TradeNotificationProducer:' + this.props.chainId + ']';
}

/**
* Main loop: Run the CMS notification producer. This method runs indefinitely,
Expand All @@ -55,12 +76,12 @@ export class TradeNotificationProducer implements Runnable {
WAIT_TIME
);

console.log('TradeNotificationProducer:' + this.props.chainId, 'stopped');
console.log(`${this.prefix} stopped`);
}

async stop(): Promise<void> {
console.log(
'Stopping TradeNotificationProducer for chainId=' + this.props.chainId
`${this.prefix} Stopping TradeNotificationProducer for chainId=${this.props.chainId}`
);
this.isStopping = true;
}
Expand All @@ -73,15 +94,84 @@ export class TradeNotificationProducer implements Runnable {
} = this.props;

// Get last indexed block
const stateRegistry = await notificationsIndexerStateRepository.get(
PRODUCER_NAME,
chainId
);
const stateRegistry =
await notificationsIndexerStateRepository.get<TradeNotificationProducerState>(
PRODUCER_NAME,
chainId
);

console.log('stateRegistry', stateRegistry);
const client = viemClients[chainId];

// Get last block
const lastBlock = await viemClients[chainId].getBlock();
const lastBlock = await client.getBlock();

// Get trade events from block to last block
const fromBlock = stateRegistry?.state
? BigInt(stateRegistry.state.lastBlock) + 1n
: lastBlock.number;

if (fromBlock > lastBlock.number) {
console.log(`${this.prefix} No new blocks to index`);
return;
}

const accounts = await subscriptionRepository.getAllSubscribedAccounts();
console.log(`${this.prefix} Accounts: ${accounts.join(', ')}`);

const logs = await client.getLogs({
events: EVENTS,
fromBlock,
toBlock: lastBlock.number,
address: getAddress(COW_PROTOCOL_SETTLEMENT_CONTRACT_ADDRESS[chainId]),
args: {
owner: accounts,
} as any,
});
console.log(`${this.prefix} Found ${logs.length} events`);

const notifications = logs.reduce<Notification[]>((acc, log) => {
if (!log.args.owner) {
// TODO: Filter for relevant ones (or better, filter in the getLogs already)
// No owner
return acc;
}

const url = log.args.orderUid
? getExplorerUrl(chainId, log.args.orderUid)
: undefined;

switch (log.eventName) {
case 'Trade': {
const message = `Trade ${log.args.sellAmount} ${log.args.sellToken} for ${log.args.buyAmount} ${log.args.buyToken}`;
console.log(`${this.prefix} New ${message}`);
acc.push({
id: 'Trade-' + log.transactionHash + '-' + log.logIndex,
account: log.args.owner,
title: 'Trade',
message,
url,
});
break;
}

case 'OrderInvalidated': {
const message = 'Order invalidated ' + log.args.orderUid;
console.log(`${this.prefix} ${message}`);
acc.push({
id: 'Trade-' + log.transactionHash + '-' + log.logIndex,
account: log.args.owner,
title: 'Trade',
message,
url,
});
break;
}
default:
console.log(`${this.prefix} Unknown event ${log}`);
break;
}
return acc;
}, []);

// TODO:
// Get trade events from block to last block
Expand All @@ -93,15 +183,17 @@ export class TradeNotificationProducer implements Runnable {
// Send notifications
// TODO:

// const accounts = await subscriptionRepository.getAllSubscribedAccounts();
console.log(`${this.prefix} Notifications:`, notifications);

// Connect
await this.props.notificationsRepository.connect();

// console.log(
// 'For now, nothing to do. I plan to fetch trades for: ' +
// accounts.join(', ')
// );
// Post notifications to queue
this.props.notificationsRepository.sendNotifications(notifications);
this.pendingNotifications.clear();

// Update state
notificationsIndexerStateRepository.upsert(
notificationsIndexerStateRepository.upsert<TradeNotificationProducerState>(
PRODUCER_NAME,
{
lastBlock: lastBlock.number.toString(),
Expand All @@ -112,3 +204,20 @@ export class TradeNotificationProducer implements Runnable {
);
}
}

interface TradeNotificationProducerState {
lastBlock: string;
lastBlockTimestamp: string;
lastBlockHash: string;
}

function getExplorerUrl(chainId: SupportedChainId, orderUid: string) {
const baseUrl = getExplorerBaseUrl(chainId);
return `${baseUrl}/orders/${orderUid}`;
}

function getExplorerBaseUrl(chainId: SupportedChainId) {
const suffix =
chainId === SupportedChainId.MAINNET ? '' : `/${CHAIN_NAME_MAP[chainId]}`;
return `https://explorer.cow.fi${suffix}`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class NotificationsIndexerStateRepository {
/**
* Update or insert indexer state
*/
async upsert(key: string, state: unknown, chainId?: number): Promise<void> {
async upsert<T>(key: string, state: T, chainId?: number): Promise<void> {
const query = `
INSERT INTO notifications_indexer_state (key, chainId, state)
VALUES ($1, $2, $3)
Expand Down
29 changes: 22 additions & 7 deletions libs/cms-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export type CmsTelegramSubscription = {
};
export type CmsTelegramSubscriptionsResponse =
Schemas['TelegramSubscriptionResponse'];

export type CmsTelegramSubscriptions =
Schemas['TelegramSubscriptionResponse']['data'];

export type CmsPushNotification = {
id: number;
account: string;
Expand Down Expand Up @@ -203,7 +207,7 @@ async function getTelegramSubscriptionsForAccounts({
CmsTelegramSubscription[]
> {
const { data, error, response } = await cmsClient.GET(
`/tg-subscriptions?accounts=${accounts.join(',')}`,
`/accounts/${accounts.join(',')}/subscriptions/telegram`,
{
// Pagination
'pagination[page]': page,
Expand All @@ -225,11 +229,14 @@ async function getSubscribedAccounts({
page = 0,
pageSize = PAGE_SIZE,
}: PaginationParam): Promise<string[]> {
const { data, error, response } = await cmsClient.GET(`/tg-subscriptions`, {
// Pagination
'pagination[page]': page,
'pagination[pageSize]': pageSize,
});
const { data, error, response } = await cmsClient.GET(
`/telegram-subscriptions`,
{
// Pagination
'pagination[page]': page,
'pagination[pageSize]': pageSize,
}
);

if (error) {
console.error(
Expand All @@ -239,7 +246,15 @@ async function getSubscribedAccounts({
throw error;
}

return (data as CmsTelegramSubscription[]).map(({ account }) => account);
const subscriptions = data.data as CmsTelegramSubscriptions[];

return subscriptions.reduce<string[]>((acc, subscription) => {
const account = subscription?.attributes?.account;
if (account) {
acc.push(account);
}
return acc;
}, []);
}

export async function getPushNotifications(): Promise<CmsPushNotification[]> {
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"importHelpers": true,
"target": "es2015",
"target": "ES2020",
"module": "esnext",
"lib": ["es2020", "dom"],
"skipLibCheck": true,
Expand Down