|
| 1 | +import { CosmWasmClient, type HttpEndpoint } from '@cosmjs/cosmwasm-stargate' |
| 2 | +import { fromBase64, fromUtf8, toBase64, toUtf8 } from '@cosmjs/encoding' |
| 3 | +import { HttpBatchClient, Tendermint34Client } from '@cosmjs/tendermint-rpc' |
| 4 | +import { type BatchCosmWasmClientOptions } from './BatchCosmWasmClient' |
| 5 | +import { |
| 6 | + type AggregateResult, |
| 7 | + type Call, |
| 8 | + type QueryMsg, |
| 9 | +} from './types/Multiquery.types' |
| 10 | + |
| 11 | +const jsonToUtf8 = (json: Record<string, unknown>): Uint8Array => |
| 12 | + toUtf8(JSON.stringify(json)) |
| 13 | +const jsonToBinary = (json: Record<string, unknown>): string => |
| 14 | + toBase64(jsonToUtf8(json)) |
| 15 | + |
| 16 | +const binaryToJson = (binary: string): string => fromUtf8(fromBase64(binary)) |
| 17 | + |
| 18 | +export interface MultiqueryOptions extends BatchCosmWasmClientOptions { |
| 19 | + multiqueryContract: string |
| 20 | +} |
| 21 | + |
| 22 | +interface QueryQueueItem { |
| 23 | + address: string |
| 24 | + queryMsg: Record<string, unknown> |
| 25 | + resolve: (result: Record<string, unknown>) => void |
| 26 | + reject: (error: Error) => void |
| 27 | +} |
| 28 | + |
| 29 | +/** |
| 30 | + * Result type for tryAggregate queries, where data can be either the successful response |
| 31 | + * or an error message (when include_cause is true and the query failed) |
| 32 | + */ |
| 33 | +export type TryAggregateResult<T = Record<string, unknown>> = |
| 34 | + | { |
| 35 | + success: true |
| 36 | + data: T |
| 37 | + error: undefined |
| 38 | + } |
| 39 | + | { |
| 40 | + success: false |
| 41 | + data: undefined |
| 42 | + error: string |
| 43 | + } |
| 44 | + |
| 45 | +const DEFAULT_BATCH_SIZE_LIMIT = 25 |
| 46 | +const DEFAULT_DISPATCH_INTERVAL = 200 |
| 47 | + |
| 48 | +export const DEFAULT_MULTIQUERY_COSMWASM_CLIENT_OPTIONS: BatchCosmWasmClientOptions = |
| 49 | + { |
| 50 | + batchSizeLimit: DEFAULT_BATCH_SIZE_LIMIT, |
| 51 | + dispatchInterval: DEFAULT_DISPATCH_INTERVAL, |
| 52 | + } |
| 53 | + |
| 54 | +/** |
| 55 | + * BatchCosmWasmClient with multiquery support by default. Note that the contract MUST be deployed on the target network and this client does not handle check for the deployment. |
| 56 | + * @see https://github.com/AbstractSDK/multiquery |
| 57 | + */ |
| 58 | +export class MultiqueryCosmWasmClient extends CosmWasmClient { |
| 59 | + private readonly multiqueryContractAddress: string |
| 60 | + private readonly _batchSizeLimit: number |
| 61 | + private readonly _dispatchInterval: number |
| 62 | + private queryQueue: QueryQueueItem[] = [] |
| 63 | + private queryTimer?: NodeJS.Timer |
| 64 | + |
| 65 | + constructor( |
| 66 | + tmClient: Tendermint34Client | undefined, |
| 67 | + options: MultiqueryOptions, |
| 68 | + ) { |
| 69 | + super(tmClient) |
| 70 | + this._batchSizeLimit = options.batchSizeLimit |
| 71 | + this._dispatchInterval = options.dispatchInterval |
| 72 | + this.multiqueryContractAddress = options.multiqueryContract |
| 73 | + this.queryTimer = setInterval( |
| 74 | + () => this.processQueryQueue(), |
| 75 | + options.dispatchInterval, |
| 76 | + ) |
| 77 | + } |
| 78 | + |
| 79 | + static async connect( |
| 80 | + endpoint: string | HttpEndpoint, |
| 81 | + // Ensure that the overridden connect is happy |
| 82 | + options: MultiqueryOptions = { |
| 83 | + ...DEFAULT_MULTIQUERY_COSMWASM_CLIENT_OPTIONS, |
| 84 | + multiqueryContract: '', |
| 85 | + }, |
| 86 | + ): Promise<MultiqueryCosmWasmClient> { |
| 87 | + if (!options.multiqueryContract) { |
| 88 | + throw new Error('Missing multiquery contract address') |
| 89 | + } |
| 90 | + const tendermint = await Tendermint34Client.create( |
| 91 | + new HttpBatchClient(endpoint, { |
| 92 | + batchSizeLimit: options.batchSizeLimit, |
| 93 | + dispatchInterval: options.dispatchInterval, |
| 94 | + }), |
| 95 | + ) |
| 96 | + return new this(tendermint, options) |
| 97 | + } |
| 98 | + |
| 99 | + /** |
| 100 | + * Get the batch size limit. |
| 101 | + * @return {number} The batch size limit. |
| 102 | + */ |
| 103 | + get batchSizeLimit(): number { |
| 104 | + return this._batchSizeLimit |
| 105 | + } |
| 106 | + |
| 107 | + /** |
| 108 | + * Get the dispatch interval. |
| 109 | + * @return {number} The dispatch interval. |
| 110 | + */ |
| 111 | + get dispatchInterval(): number { |
| 112 | + return this._dispatchInterval |
| 113 | + } |
| 114 | + |
| 115 | + override async queryContractSmart( |
| 116 | + address: string, |
| 117 | + queryMsg: Record<string, unknown>, |
| 118 | + ): Promise<Record<string, unknown>> { |
| 119 | + return new Promise((resolve, reject) => { |
| 120 | + this.queryQueue.push({ address, queryMsg, resolve, reject }) |
| 121 | + |
| 122 | + if (this.queryQueue.length >= this.batchSizeLimit) { |
| 123 | + this.processQueryQueue() |
| 124 | + } |
| 125 | + }) |
| 126 | + } |
| 127 | + |
| 128 | + async queryContractsBatch( |
| 129 | + queries: Array<{ address: string; queryMsg: Record<string, unknown> }>, |
| 130 | + ): Promise<Record<string, unknown>[]> { |
| 131 | + return Promise.all( |
| 132 | + queries.map(({ address, queryMsg }) => |
| 133 | + this.queryContractSmart(address, queryMsg), |
| 134 | + ), |
| 135 | + ) |
| 136 | + } |
| 137 | + |
| 138 | + /** |
| 139 | + * Aggregate queries with error suppression |
| 140 | + * @param queries Array of contract queries to execute |
| 141 | + * @param requireSuccess If true, throws error when any query fails |
| 142 | + * @returns Array of results where data is either the successful response or error message |
| 143 | + */ |
| 144 | + async tryAggregate( |
| 145 | + queries: Array<{ address: string; queryMsg: Record<string, unknown> }>, |
| 146 | + requireSuccess = false, |
| 147 | + ): Promise<TryAggregateResult[]> { |
| 148 | + const calls: Call[] = queries.map(({ address, queryMsg }) => ({ |
| 149 | + address, |
| 150 | + data: jsonToBinary(queryMsg), |
| 151 | + })) |
| 152 | + |
| 153 | + const result = (await super.queryContractSmart( |
| 154 | + this.multiqueryContractAddress, |
| 155 | + <QueryMsg>{ |
| 156 | + try_aggregate: { |
| 157 | + queries: calls, |
| 158 | + require_success: requireSuccess, |
| 159 | + include_cause: true, |
| 160 | + }, |
| 161 | + }, |
| 162 | + )) as AggregateResult |
| 163 | + |
| 164 | + return result.return_data.map(({ success, data }) => { |
| 165 | + if (success) { |
| 166 | + return { |
| 167 | + success: true, |
| 168 | + data: data ? JSON.parse(binaryToJson(data)) : {}, |
| 169 | + error: undefined, |
| 170 | + } as const |
| 171 | + } else { |
| 172 | + return { |
| 173 | + success: false, |
| 174 | + data: undefined, |
| 175 | + error: binaryToJson(data) || 'Query failed', |
| 176 | + } as const |
| 177 | + } |
| 178 | + }) |
| 179 | + } |
| 180 | + |
| 181 | + /** |
| 182 | + * Process the accumulated query queue using tryAggregate |
| 183 | + */ |
| 184 | + private async processQueryQueue(): Promise<void> { |
| 185 | + const batch = this.queryQueue.splice(0, this.batchSizeLimit) |
| 186 | + if (!batch.length) return |
| 187 | + |
| 188 | + try { |
| 189 | + const queries = batch.map(({ address, queryMsg }) => ({ |
| 190 | + address, |
| 191 | + queryMsg, |
| 192 | + })) |
| 193 | + |
| 194 | + const results = await this.tryAggregate(queries, false) |
| 195 | + |
| 196 | + results.forEach((result, index) => { |
| 197 | + if (!batch[index]) return |
| 198 | + const { resolve, reject } = batch[index]! |
| 199 | + if (result.success) { |
| 200 | + resolve(result.data as Record<string, unknown>) |
| 201 | + } else { |
| 202 | + reject(new Error(result.error)) |
| 203 | + } |
| 204 | + }) |
| 205 | + } catch (error) { |
| 206 | + batch.forEach(({ reject }) => { |
| 207 | + reject(error instanceof Error ? error : new Error(String(error))) |
| 208 | + }) |
| 209 | + } |
| 210 | + } |
| 211 | + |
| 212 | + override disconnect(): void { |
| 213 | + if (this.queryTimer) { |
| 214 | + clearInterval(this.queryTimer) |
| 215 | + this.queryTimer = undefined |
| 216 | + } |
| 217 | + super.disconnect() |
| 218 | + } |
| 219 | +} |
0 commit comments