From 805494ffbac7b334d7f2615e94fbbf4157d5e262 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 28 Mar 2025 16:34:17 +0200 Subject: [PATCH 01/22] MEX-748 Implement Claim Rewards notifications in xPortal --- .env.example | 8 + src/helpers/api.config.service.ts | 54 ++++ src/main.ts | 6 + .../models/push.notifications.model.ts | 47 ++++ .../push.notifications.module.ts | 31 +++ .../push.notifications.service.ts | 242 ++++++++++++++++++ .../push.notifications.setter.service.ts | 51 ++++ 7 files changed, 439 insertions(+) create mode 100644 src/modules/push-notifications/models/push.notifications.model.ts create mode 100644 src/modules/push-notifications/push.notifications.module.ts create mode 100644 src/modules/push-notifications/push.notifications.service.ts create mode 100644 src/modules/push-notifications/services/push.notifications.setter.service.ts diff --git a/.env.example b/.env.example index 74850e275..64e4ebf1a 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,7 @@ ENABLE_CACHE_WARMER=true ENABLE_EVENTS_NOTIFIER=false ENABLE_TRACER=false ENABLE_COMPLEXITY=true +ENABLE_PUSH_NOTIFICATIONS=true #Log filename to use for file logging. Optional LOG_FILE= @@ -74,3 +75,10 @@ OPEN_EXCHANGE_RATES_URL=https://openexchangerates.org/api # AWS Timestream AWS_TIMESTREAM_READ=false AWS_TIMESTREAM_WRITE=false + +# Notification Service Configuration +PUSH_NOTIFICATIONS_API_URL= +PUSH_NOTIFICATIONS_API_KEY= +PUSH_NOTIFICATIONS_BATCH_SIZE=100 +PUSH_NOTIFICATIONS_MAX_RETRIES=3 +PUSH_NOTIFICATIONS_CHAIN_ID=508 # 508 for devnet diff --git a/src/helpers/api.config.service.ts b/src/helpers/api.config.service.ts index 27b1184da..6b10fd839 100644 --- a/src/helpers/api.config.service.ts +++ b/src/helpers/api.config.service.ts @@ -196,6 +196,50 @@ export class ApiConfigService { return mongoDBPassword; } + getNotificationsApiUrl(): string { + const apiUrl = this.configService.get('PUSH_NOTIFICATIONS_API_URL'); + if (!apiUrl) { + throw new Error('No push notifications API url present'); + } + return apiUrl; + } + + getNotificationsApiKey(): string { + const apiKey = this.configService.get('PUSH_NOTIFICATIONS_API_KEY'); + if (!apiKey) { + throw new Error('No push notifications API key present'); + } + return apiKey; + } + + getNotificationsBatchSize(): number { + const batchSize = this.configService.get( + 'PUSH_NOTIFICATIONS_BATCH_SIZE', + ); + if (!batchSize) { + throw new Error('No push notifications batch size present'); + } + return batchSize; + } + + getNotificationsMaxRetries(): number { + const maxRetries = this.configService.get( + 'PUSH_NOTIFICATIONS_MAX_RETRIES', + ); + if (!maxRetries) { + throw new Error('No push notifications max retries present'); + } + return maxRetries; + } + + getChainId(): number { + const chainId = this.configService.get('PUSH_NOTIFICATIONS_CHAIN_ID'); + if (!chainId) { + throw new Error('No push notifications chainId present'); + } + return chainId; + } + getJwtSecret(): string { const secret = this.configService.get('JWT_SECRET'); if (!secret) { @@ -352,4 +396,14 @@ export class ApiConfigService { getRateLimiterSecret(): string | undefined { return this.configService.get('RATE_LIMITER_SECRET'); } + + isNotificationsModuleActive(): boolean { + const notificationsModuleActive = this.configService.get( + 'ENABLE_PUSH_NOTIFICATIONS', + ); + if (!notificationsModuleActive) { + return false; + } + return notificationsModuleActive === 'true'; + } } diff --git a/src/main.ts b/src/main.ts index c21d322c9..174e31798 100644 --- a/src/main.ts +++ b/src/main.ts @@ -13,6 +13,7 @@ import cookieParser from 'cookie-parser'; import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston'; import { LoggerService } from '@nestjs/common'; import { NestExpressApplication } from '@nestjs/platform-express'; +import { PushNotificationsModule } from './modules/push-notifications/push.notifications.module'; async function bootstrap() { BigNumber.config({ EXPONENTIAL_AT: [-30, 30] }); @@ -90,5 +91,10 @@ async function bootstrap() { await rabbitMqService.getFilterAddresses(); await eventsNotifierApp.listen(5673, '0.0.0.0'); } + + if (apiConfigService.isNotificationsModuleActive()) { + const pushNotificationsApp = await NestFactory.create(PushNotificationsModule); + await pushNotificationsApp.listen(5674, '0.0.0.0'); + } } bootstrap(); diff --git a/src/modules/push-notifications/models/push.notifications.model.ts b/src/modules/push-notifications/models/push.notifications.model.ts new file mode 100644 index 000000000..f9798b212 --- /dev/null +++ b/src/modules/push-notifications/models/push.notifications.model.ts @@ -0,0 +1,47 @@ +export interface EnergyDetailsType { + lastUpdateEpoch: number; + amount: string; + totalLockedTokens: string; +} + +export interface AccountType { + address: string; + nonce: number; + balance: string; + balanceNum: number; + totalBalanceWithStake: string; + totalBalanceWithStakeNum: number; + timestamp: number; + shardID: number; + totalStake: string; + energy: string; + energyNum: number; + energyDetails: EnergyDetailsType; + totalUnDelegate: string; +} + +export interface ContractKeysRaw { + data: { + blockInfo: { + hash: string; + nonce: number; + rootHash: string; + }; + pairs: Record; + }; + code: string; +} + +export interface UserEnergyAddress { + address: string; + notificationSent: boolean; +} + +export interface NotificationPayload { + addresses: string[]; + chainId: number; + title: string; + body: string; + route: string; + iconUrl: string; +} \ No newline at end of file diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts new file mode 100644 index 000000000..0300f4521 --- /dev/null +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -0,0 +1,31 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { PushNotificationsService } from './push.notifications.service'; +import { PushNotificationsSetterService } from './services/push.notifications.setter.service'; +import { CommonAppModule } from '../../common.app.module'; +import { MXCommunicationModule } from '../../services/multiversx-communication/mx.communication.module'; +import { ElasticSearchModule } from '../../services/elastic-search/elastic.search.module'; +import { ContextModule } from '../../services/context/context.module'; +import { CacheModule } from '../../services/caching/cache.module'; +import { EnergyModule } from '../energy/energy.module'; + +@Module({ + imports: [ + CommonAppModule, + ScheduleModule.forRoot(), + MXCommunicationModule, + ElasticSearchModule, + ContextModule, + CacheModule, + EnergyModule, + ], + providers: [ + PushNotificationsService, + PushNotificationsSetterService, + ], + exports: [ + PushNotificationsService, + PushNotificationsSetterService, + ], +}) +export class PushNotificationsModule {} \ No newline at end of file diff --git a/src/modules/push-notifications/push.notifications.service.ts b/src/modules/push-notifications/push.notifications.service.ts new file mode 100644 index 000000000..ce0d71807 --- /dev/null +++ b/src/modules/push-notifications/push.notifications.service.ts @@ -0,0 +1,242 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { scAddress } from 'src/config'; +import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; +import { ApiConfigService } from 'src/helpers/api.config.service'; +import { + AddressUtils, + BinaryUtils, + ErrorLoggerAsync, + Lock, + Constants, +} from '@multiversx/sdk-nestjs-common'; +import { GetOrSetCache } from 'src/helpers/decorators/caching.decorator'; +import { + ElasticQuery, + ElasticService, + QueryType, +} from '@multiversx/sdk-nestjs-elastic'; +import { ContextGetterService } from 'src/services/context/context.getter.service'; +import axios from 'axios'; +import { + AccountType, + ContractKeysRaw, + NotificationPayload, + UserEnergyAddress, +} from './models/push.notifications.model'; +import { PushNotificationsSetterService } from './services/push.notifications.setter.service'; + +@Injectable() +export class PushNotificationsService { + private readonly logger = new Logger(PushNotificationsService.name); + private readonly maxRetries: number; + + constructor( + private readonly apiService: MXApiService, + private readonly apiConfigService: ApiConfigService, + private readonly elasticService: ElasticService, + private readonly contextGetter: ContextGetterService, + private readonly notificationsSetter: PushNotificationsSetterService, + ) { + this.maxRetries = this.apiConfigService.getNotificationsMaxRetries(); + } + + @ErrorLoggerAsync() + private async usersWithEnergyFromContractStorage(): Promise< + UserEnergyAddress[] + > { + const contractAddress = scAddress.simpleLockEnergy; + const contractKeysRaw: ContractKeysRaw = + await this.apiService.doGetGeneric( + 'getContractKeys', + `address/${contractAddress}/keys`, + ); + + const contractPairs = Object.entries( + contractKeysRaw?.data?.pairs || {}, + ); + + const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); + const userEnergyKeys = contractPairs + .filter(([key, _]) => key.startsWith(userEnergyKey)) + .map(([key, _]) => key.replace(userEnergyKey, '')); + + const userEnergyAddresses = userEnergyKeys.map((key) => + AddressUtils.bech32Encode(key), + ); + + return userEnergyAddresses.map((address) => ({ + address, + notificationSent: false, + })); + } + + @ErrorLoggerAsync() + private async getUsersFromIndexer( + epoch: number, + ): Promise { + const query = ElasticQuery.create() + .withPagination({ from: 0, size: 10000 }) + .withMustExistCondition('energyDetails'); + + query.condition.must = [ + QueryType.Range('energyDetails.amount', { key: 'gt', value: 0 }), + ]; + + const allAddresses: string[] = []; + + await this.elasticService.getScrollableList( + `accounts-000001_${epoch}`, + 'address', + query, + async (items: AccountType[]) => { + const addresses = items.map( + (item: AccountType) => item.address, + ); + allAddresses.push(...addresses); + }, + ); + + return allAddresses.map((address) => ({ + address, + notificationSent: false, + })); + } + + private async sendBatchNotifications( + addresses: string[], + ): Promise { + const chainId = Number(this.apiConfigService.getChainId()); + const notificationsApiUrl = + this.apiConfigService.getNotificationsApiUrl(); + const notificationsApiKey = + this.apiConfigService.getNotificationsApiKey(); + + const payload: NotificationPayload = { + addresses, + chainId, + title: 'Energy Update', + body: 'You can now claim your rewards', + route: '/portfolio', + iconUrl: + 'https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp', + }; + + const response = await axios.post(notificationsApiUrl, payload, { + headers: { + 'Content-Type': 'application/json', + 'x-notifications-api-key': notificationsApiKey, + }, + }); + + if (response.status !== 201) { + throw new Error( + `Notification API responded with status ${response.status}: ${response.data}`, + ); + } + + return true; + } + + private async processNotificationBatch( + addresses: string[], + maxRetries: number, + ): Promise<{ successful: string[]; failed: string[] }> { + const batchSize = + await this.apiConfigService.getNotificationsBatchSize(); + + const successful: string[] = []; + let remainingAddresses = [...addresses]; + + for ( + let retryCount = 0; + retryCount < maxRetries && remainingAddresses.length > 0; + retryCount++ + ) { + if (retryCount > 0) { + this.logger.log( + `Retry attempt ${retryCount} for ${remainingAddresses.length} addresses`, + ); + await new Promise((resolve) => setTimeout(resolve, 10000)); + } + + const addressesToRetry = [...remainingAddresses]; + remainingAddresses = []; + + for (let i = 0; i < addressesToRetry.length; i += batchSize) { + const batch = addressesToRetry.slice(i, i + batchSize); + const success = await this.sendBatchNotifications(batch); + + if (success) { + successful.push(...batch); + } else { + remainingAddresses.push(...batch); + } + } + } + + return { successful, failed: remainingAddresses }; + } + + @GetOrSetCache({ + baseKey: 'notifications', + remoteTtl: Constants.oneWeek(), + localTtl: Constants.oneDay() * 6, + }) + private async userEnergyAddresses(): Promise { + const isDevnet = process.env.NODE_ENV === 'devnet'; + + if (isDevnet) return await this.usersWithEnergyFromContractStorage(); + + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + return await this.getUsersFromIndexer(currentEpoch - 1); + } + + @Cron('0 0 19 * * 5') // Every Friday at 19:00 + @Lock({ name: 'handleNotificationsCron', verbose: true }) + async handleNotificationsCron() { + if (!this.apiConfigService.isNotificationsModuleActive()) { + return; + } + + try { + this.logger.log('Starting weekly energy notifications job...'); + + // Get eligible users (will use cache if available) + const eligibleUsers = await this.userEnergyAddresses(); + + // Process notifications + const usersToNotify = eligibleUsers + .filter((user) => !user.notificationSent) + .map((user) => user.address); + + if (usersToNotify.length === 0) { + this.logger.log('No new notifications to send'); + return; + } + + const { successful, failed } = await this.processNotificationBatch( + usersToNotify, + this.maxRetries, + ); + + // Update notification status for successful ones + if (successful.length > 0) { + await this.notificationsSetter.updateNotificationStatus( + successful, + true, + ); + this.logger.log( + `Successfully sent notifications to ${successful.length} addresses`, + ); + } + + this.logger.log( + `Weekly notification job completed. Success: ${successful.length}, Failed: ${failed.length}`, + ); + } catch (error) { + this.logger.error('Error in notification cron job:', error.message); + throw error; + } + } +} diff --git a/src/modules/push-notifications/services/push.notifications.setter.service.ts b/src/modules/push-notifications/services/push.notifications.setter.service.ts new file mode 100644 index 000000000..2268701ec --- /dev/null +++ b/src/modules/push-notifications/services/push.notifications.setter.service.ts @@ -0,0 +1,51 @@ +import { Injectable } from '@nestjs/common'; +import { CacheService } from 'src/services/caching/cache.service'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { UserEnergyAddress } from '../models/push.notifications.model'; + +@Injectable() +export class PushNotificationsSetterService { + private readonly cacheKey = 'userEnergyAddresses'; + + constructor(private readonly cacheService: CacheService) {} + + async setUsersInCache(users: UserEnergyAddress[]): Promise { + await this.cacheService.set( + this.cacheKey, + users, + Constants.oneWeek(), + Constants.oneDay() * 6, + ); + } + + async getUsersFromCache(): Promise { + const users = await this.cacheService.get( + this.cacheKey, + ); + + if (!users) return null; + + return users; + } + + async updateNotificationStatus( + addresses: string[], + status: boolean, + ): Promise { + const users = await this.getUsersFromCache(); + + if (!users) return; + + const updatedUsers = users.map((user) => + addresses.includes(user.address) + ? { ...user, notificationSent: status } + : user, + ); + + await this.setUsersInCache(updatedUsers); + } + + async clearCache(): Promise { + await this.cacheService.delete(this.cacheKey); + } +} From bc73efe83df0545a6e7b42e8c455c2d1e2047850 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 28 Mar 2025 16:44:28 +0200 Subject: [PATCH 02/22] MEX-748 Fix issue on failed req --- src/modules/push-notifications/push.notifications.service.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/modules/push-notifications/push.notifications.service.ts b/src/modules/push-notifications/push.notifications.service.ts index ce0d71807..0412e6535 100644 --- a/src/modules/push-notifications/push.notifications.service.ts +++ b/src/modules/push-notifications/push.notifications.service.ts @@ -130,9 +130,7 @@ export class PushNotificationsService { }); if (response.status !== 201) { - throw new Error( - `Notification API responded with status ${response.status}: ${response.data}`, - ); + return false; } return true; From e33d0707bc4484c55faccacae13e3708f2b62b5f Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 11 Apr 2025 09:41:15 +0300 Subject: [PATCH 03/22] MEX-748 Make the module more generic --- src/config/default.json | 14 + src/config/index.ts | 2 + .../crons/push.notifications.energy.ts | 87 +++++++ ...s.model.ts => push.notifications.types.ts} | 17 +- .../push.notifications.module.ts | 12 +- .../push.notifications.service.ts | 240 ------------------ .../services/push.notifications.service.ts | 130 ++++++++++ .../push.notifications.setter.service.ts | 52 ++-- src/services/caching/cache.service.ts | 14 + .../elastic-search/elastic.search.module.ts | 13 +- .../services/es.accounts.energy.service.ts | 34 +++ .../mx.communication.module.ts | 3 + .../mx.xportal.api.service.ts | 66 +++++ 13 files changed, 395 insertions(+), 289 deletions(-) create mode 100644 src/modules/push-notifications/crons/push.notifications.energy.ts rename src/modules/push-notifications/models/{push.notifications.model.ts => push.notifications.types.ts} (75%) delete mode 100644 src/modules/push-notifications/push.notifications.service.ts create mode 100644 src/modules/push-notifications/services/push.notifications.service.ts create mode 100644 src/services/elastic-search/services/es.accounts.energy.service.ts create mode 100644 src/services/multiversx-communication/mx.xportal.api.service.ts diff --git a/src/config/default.json b/src/config/default.json index 13ffe4d62..b59119693 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -700,5 +700,19 @@ "nested": 1.5, "exponentialBase": 10 } + }, + "pushNotifications": { + "feesCollectorRewards": { + "title": "Energy Update", + "body": "You can now claim your fees rewards", + "route": "/portfolio", + "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" + }, + "negativeEnergy": { + "title": "Energy Update", + "body": "You have negative energy", + "route": "/energy", + "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" + } } } diff --git a/src/config/index.ts b/src/config/index.ts index b34cadc95..d032baed6 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -38,3 +38,5 @@ export const dataApiConfig = config.get('dataApi'); export const leaguesConfig = config.get('leagues'); export const complexityConfig = config.get('complexity'); + +export const pushNotificationsConfig = config.get('pushNotifications'); diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts new file mode 100644 index 000000000..640f0659d --- /dev/null +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -0,0 +1,87 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { Lock } from '@multiversx/sdk-nestjs-common'; +import { PushNotificationsService } from '../services/push.notifications.service'; +import { + AccountType, + NotificationType, +} from '../models/push.notifications.types'; +import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; +import { ContextGetterService } from 'src/services/context/context.getter.service'; + +@Injectable() +export class PushNotificationsEnergyCron { + constructor( + private readonly pushNotificationsService: PushNotificationsService, + private readonly contextGetter: ContextGetterService, + private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, + ) {} + + @Cron('0 0 19 * * 5') // Every Friday at 19:00 + @Lock({ name: 'feesCollectorRewardsCron', verbose: true }) + async feesCollectorRewardsCron() { + const isDevnet = process.env.NODE_ENV === 'devnet'; + + if (isDevnet) { + const addresses = + await this.pushNotificationsService.usersWithEnergyFromContractStorage(); + + await this.pushNotificationsService.sendNotifications( + addresses, + NotificationType.FEES_COLLECTOR_REWARDS, + ); + return; + } + + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + + await this.accountsEnergyElasticService.getAccountsByEnergyAmount( + currentEpoch - 1, + 'gt', + async (items: AccountType[]) => { + const addresses = items.map( + (item: AccountType) => item.address, + ); + + await this.pushNotificationsService.sendNotifications( + addresses, + NotificationType.FEES_COLLECTOR_REWARDS, + ); + }, + ); + } + + @Cron('0 12 */2 * *') // Every 2 days at noon + @Lock({ name: 'negativeEnergyNotificationsCron', verbose: true }) + async negativeEnergyNotificationsCron() { + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + + await this.accountsEnergyElasticService.getAccountsByEnergyAmount( + currentEpoch - 1, + 'lt', + async (items: AccountType[]) => { + const addresses = items.map( + (item: AccountType) => item.address, + ); + + await this.pushNotificationsService.sendNotifications( + addresses, + NotificationType.NEGATIVE_ENERGY, + ); + }, + 0, + ); + } + + @Cron('*/1 * * * *') // Every 10 minutes + @Lock({ name: 'retryFailedNotificationsCron', verbose: true }) + async retryFailedNotificationsCron() { + const notificationTypes = Object.values(NotificationType); + + for (const notificationType of notificationTypes) { + await this.pushNotificationsService.retryFailedNotifications( + notificationType, + ); + } + } +} diff --git a/src/modules/push-notifications/models/push.notifications.model.ts b/src/modules/push-notifications/models/push.notifications.types.ts similarity index 75% rename from src/modules/push-notifications/models/push.notifications.model.ts rename to src/modules/push-notifications/models/push.notifications.types.ts index f9798b212..031b454e7 100644 --- a/src/modules/push-notifications/models/push.notifications.model.ts +++ b/src/modules/push-notifications/models/push.notifications.types.ts @@ -32,16 +32,19 @@ export interface ContractKeysRaw { code: string; } -export interface UserEnergyAddress { - address: string; - notificationSent: boolean; +export interface NotificationResult { + successful: string[]; + failed: string[]; } -export interface NotificationPayload { - addresses: string[]; - chainId: number; +export interface NotificationConfig { title: string; body: string; route: string; iconUrl: string; -} \ No newline at end of file +} + +export enum NotificationType { + FEES_COLLECTOR_REWARDS = 'feesCollectorRewards', + NEGATIVE_ENERGY = 'negativeEnergy', +} diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts index 0300f4521..7001a8786 100644 --- a/src/modules/push-notifications/push.notifications.module.ts +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; import { ScheduleModule } from '@nestjs/schedule'; -import { PushNotificationsService } from './push.notifications.service'; +import { PushNotificationsService } from './services/push.notifications.service'; import { PushNotificationsSetterService } from './services/push.notifications.setter.service'; import { CommonAppModule } from '../../common.app.module'; import { MXCommunicationModule } from '../../services/multiversx-communication/mx.communication.module'; @@ -8,7 +8,7 @@ import { ElasticSearchModule } from '../../services/elastic-search/elastic.searc import { ContextModule } from '../../services/context/context.module'; import { CacheModule } from '../../services/caching/cache.module'; import { EnergyModule } from '../energy/energy.module'; - +import { PushNotificationsEnergyCron } from './crons/push.notifications.energy'; @Module({ imports: [ CommonAppModule, @@ -22,10 +22,8 @@ import { EnergyModule } from '../energy/energy.module'; providers: [ PushNotificationsService, PushNotificationsSetterService, + PushNotificationsEnergyCron, ], - exports: [ - PushNotificationsService, - PushNotificationsSetterService, - ], + exports: [], }) -export class PushNotificationsModule {} \ No newline at end of file +export class PushNotificationsModule {} diff --git a/src/modules/push-notifications/push.notifications.service.ts b/src/modules/push-notifications/push.notifications.service.ts deleted file mode 100644 index 0412e6535..000000000 --- a/src/modules/push-notifications/push.notifications.service.ts +++ /dev/null @@ -1,240 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { Cron } from '@nestjs/schedule'; -import { scAddress } from 'src/config'; -import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; -import { ApiConfigService } from 'src/helpers/api.config.service'; -import { - AddressUtils, - BinaryUtils, - ErrorLoggerAsync, - Lock, - Constants, -} from '@multiversx/sdk-nestjs-common'; -import { GetOrSetCache } from 'src/helpers/decorators/caching.decorator'; -import { - ElasticQuery, - ElasticService, - QueryType, -} from '@multiversx/sdk-nestjs-elastic'; -import { ContextGetterService } from 'src/services/context/context.getter.service'; -import axios from 'axios'; -import { - AccountType, - ContractKeysRaw, - NotificationPayload, - UserEnergyAddress, -} from './models/push.notifications.model'; -import { PushNotificationsSetterService } from './services/push.notifications.setter.service'; - -@Injectable() -export class PushNotificationsService { - private readonly logger = new Logger(PushNotificationsService.name); - private readonly maxRetries: number; - - constructor( - private readonly apiService: MXApiService, - private readonly apiConfigService: ApiConfigService, - private readonly elasticService: ElasticService, - private readonly contextGetter: ContextGetterService, - private readonly notificationsSetter: PushNotificationsSetterService, - ) { - this.maxRetries = this.apiConfigService.getNotificationsMaxRetries(); - } - - @ErrorLoggerAsync() - private async usersWithEnergyFromContractStorage(): Promise< - UserEnergyAddress[] - > { - const contractAddress = scAddress.simpleLockEnergy; - const contractKeysRaw: ContractKeysRaw = - await this.apiService.doGetGeneric( - 'getContractKeys', - `address/${contractAddress}/keys`, - ); - - const contractPairs = Object.entries( - contractKeysRaw?.data?.pairs || {}, - ); - - const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); - const userEnergyKeys = contractPairs - .filter(([key, _]) => key.startsWith(userEnergyKey)) - .map(([key, _]) => key.replace(userEnergyKey, '')); - - const userEnergyAddresses = userEnergyKeys.map((key) => - AddressUtils.bech32Encode(key), - ); - - return userEnergyAddresses.map((address) => ({ - address, - notificationSent: false, - })); - } - - @ErrorLoggerAsync() - private async getUsersFromIndexer( - epoch: number, - ): Promise { - const query = ElasticQuery.create() - .withPagination({ from: 0, size: 10000 }) - .withMustExistCondition('energyDetails'); - - query.condition.must = [ - QueryType.Range('energyDetails.amount', { key: 'gt', value: 0 }), - ]; - - const allAddresses: string[] = []; - - await this.elasticService.getScrollableList( - `accounts-000001_${epoch}`, - 'address', - query, - async (items: AccountType[]) => { - const addresses = items.map( - (item: AccountType) => item.address, - ); - allAddresses.push(...addresses); - }, - ); - - return allAddresses.map((address) => ({ - address, - notificationSent: false, - })); - } - - private async sendBatchNotifications( - addresses: string[], - ): Promise { - const chainId = Number(this.apiConfigService.getChainId()); - const notificationsApiUrl = - this.apiConfigService.getNotificationsApiUrl(); - const notificationsApiKey = - this.apiConfigService.getNotificationsApiKey(); - - const payload: NotificationPayload = { - addresses, - chainId, - title: 'Energy Update', - body: 'You can now claim your rewards', - route: '/portfolio', - iconUrl: - 'https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp', - }; - - const response = await axios.post(notificationsApiUrl, payload, { - headers: { - 'Content-Type': 'application/json', - 'x-notifications-api-key': notificationsApiKey, - }, - }); - - if (response.status !== 201) { - return false; - } - - return true; - } - - private async processNotificationBatch( - addresses: string[], - maxRetries: number, - ): Promise<{ successful: string[]; failed: string[] }> { - const batchSize = - await this.apiConfigService.getNotificationsBatchSize(); - - const successful: string[] = []; - let remainingAddresses = [...addresses]; - - for ( - let retryCount = 0; - retryCount < maxRetries && remainingAddresses.length > 0; - retryCount++ - ) { - if (retryCount > 0) { - this.logger.log( - `Retry attempt ${retryCount} for ${remainingAddresses.length} addresses`, - ); - await new Promise((resolve) => setTimeout(resolve, 10000)); - } - - const addressesToRetry = [...remainingAddresses]; - remainingAddresses = []; - - for (let i = 0; i < addressesToRetry.length; i += batchSize) { - const batch = addressesToRetry.slice(i, i + batchSize); - const success = await this.sendBatchNotifications(batch); - - if (success) { - successful.push(...batch); - } else { - remainingAddresses.push(...batch); - } - } - } - - return { successful, failed: remainingAddresses }; - } - - @GetOrSetCache({ - baseKey: 'notifications', - remoteTtl: Constants.oneWeek(), - localTtl: Constants.oneDay() * 6, - }) - private async userEnergyAddresses(): Promise { - const isDevnet = process.env.NODE_ENV === 'devnet'; - - if (isDevnet) return await this.usersWithEnergyFromContractStorage(); - - const currentEpoch = await this.contextGetter.getCurrentEpoch(); - return await this.getUsersFromIndexer(currentEpoch - 1); - } - - @Cron('0 0 19 * * 5') // Every Friday at 19:00 - @Lock({ name: 'handleNotificationsCron', verbose: true }) - async handleNotificationsCron() { - if (!this.apiConfigService.isNotificationsModuleActive()) { - return; - } - - try { - this.logger.log('Starting weekly energy notifications job...'); - - // Get eligible users (will use cache if available) - const eligibleUsers = await this.userEnergyAddresses(); - - // Process notifications - const usersToNotify = eligibleUsers - .filter((user) => !user.notificationSent) - .map((user) => user.address); - - if (usersToNotify.length === 0) { - this.logger.log('No new notifications to send'); - return; - } - - const { successful, failed } = await this.processNotificationBatch( - usersToNotify, - this.maxRetries, - ); - - // Update notification status for successful ones - if (successful.length > 0) { - await this.notificationsSetter.updateNotificationStatus( - successful, - true, - ); - this.logger.log( - `Successfully sent notifications to ${successful.length} addresses`, - ); - } - - this.logger.log( - `Weekly notification job completed. Success: ${successful.length}, Failed: ${failed.length}`, - ); - } catch (error) { - this.logger.error('Error in notification cron job:', error.message); - throw error; - } - } -} diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts new file mode 100644 index 000000000..f142d86c9 --- /dev/null +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -0,0 +1,130 @@ +import { Injectable } from '@nestjs/common'; +import { pushNotificationsConfig, scAddress } from 'src/config'; +import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; +import { ApiConfigService } from 'src/helpers/api.config.service'; +import { + AddressUtils, + BinaryUtils, + ErrorLoggerAsync, +} from '@multiversx/sdk-nestjs-common'; + +import { + ContractKeysRaw, + NotificationConfig, + NotificationResult, + NotificationType, +} from '../models/push.notifications.types'; +import { PushNotificationsSetterService } from './push.notifications.setter.service'; +import { XPortalApiService } from 'src/services/multiversx-communication/mx.xportal.api.service'; + +@Injectable() +export class PushNotificationsService { + constructor( + private readonly apiService: MXApiService, + private readonly apiConfigService: ApiConfigService, + private readonly notificationsSetter: PushNotificationsSetterService, + private readonly xPortalApiService: XPortalApiService, + ) {} + + @ErrorLoggerAsync() + async usersWithEnergyFromContractStorage(): Promise { + const contractAddress = scAddress.simpleLockEnergy; + const contractKeysRaw: ContractKeysRaw = + await this.apiService.doGetGeneric( + 'getContractKeys', + `address/${contractAddress}/keys`, + ); + + const contractPairs = Object.entries( + contractKeysRaw?.data?.pairs || {}, + ); + + const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); + const userEnergyKeys = contractPairs + .filter(([key, _]) => key.startsWith(userEnergyKey)) + .map(([key, _]) => key.replace(userEnergyKey, '')); + + const userEnergyAddresses = userEnergyKeys.map((key) => + AddressUtils.bech32Encode(key), + ); + + return userEnergyAddresses; + } + + async sendNotifications( + addresses: string[], + notificationType: NotificationType, + ): Promise { + return this.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[notificationType], + notificationType, + ); + } + + private async sendNotificationsInBatches( + addresses: string[], + notificationParams: NotificationConfig, + notificationKey: string, + ): Promise { + const batchSize = this.apiConfigService.getNotificationsBatchSize(); + const failed: string[] = []; + const successful: string[] = []; + + for (let i = 0; i < addresses.length; i += batchSize) { + const batch = addresses.slice(i, i + batchSize); + try { + const success = + await this.xPortalApiService.sendPushNotifications( + batch, + notificationParams.title, + notificationParams.body, + notificationParams.route, + notificationParams.iconUrl, + ); + + if (success) { + successful.push(...batch); + } else { + failed.push(...batch); + } + } catch (error) { + failed.push(...batch); + } + } + + if (failed.length > 0) { + await this.notificationsSetter.addFailedNotifications( + failed, + notificationKey, + ); + } + + return { successful, failed }; + } + + async retryFailedNotifications( + notificationType: NotificationType, + ): Promise { + const failedAddresses = + await this.notificationsSetter.getFailedNotifications( + notificationType, + ); + + if (!failedAddresses || failedAddresses.length === 0) { + return; + } + + const { successful } = await this.sendNotifications( + failedAddresses, + notificationType, + ); + + if (successful.length > 0) { + await this.notificationsSetter.removeFailedNotifications( + successful, + notificationType, + ); + } + } +} diff --git a/src/modules/push-notifications/services/push.notifications.setter.service.ts b/src/modules/push-notifications/services/push.notifications.setter.service.ts index 2268701ec..f8e32abd1 100644 --- a/src/modules/push-notifications/services/push.notifications.setter.service.ts +++ b/src/modules/push-notifications/services/push.notifications.setter.service.ts @@ -1,51 +1,37 @@ import { Injectable } from '@nestjs/common'; import { CacheService } from 'src/services/caching/cache.service'; import { Constants } from '@multiversx/sdk-nestjs-common'; -import { UserEnergyAddress } from '../models/push.notifications.model'; @Injectable() export class PushNotificationsSetterService { - private readonly cacheKey = 'userEnergyAddresses'; + private readonly failedNotificationsPrefix = 'pushNotificationsFailed'; constructor(private readonly cacheService: CacheService) {} - async setUsersInCache(users: UserEnergyAddress[]): Promise { - await this.cacheService.set( - this.cacheKey, - users, - Constants.oneWeek(), - Constants.oneDay() * 6, - ); - } - - async getUsersFromCache(): Promise { - const users = await this.cacheService.get( - this.cacheKey, - ); + async addFailedNotifications( + addresses: string[], + notificationKey: string, + ttl: number = Constants.oneWeek(), + ): Promise { + if (!addresses || addresses.length === 0) return; + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; - if (!users) return null; + await this.cacheService.addToSet(redisKey, addresses); + await this.cacheService.setTtlRemote(redisKey, ttl); + } - return users; + async getFailedNotifications(notificationKey: string): Promise { + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; + return await this.cacheService.getSetMembers(redisKey); } - async updateNotificationStatus( + async removeFailedNotifications( addresses: string[], - status: boolean, + notificationKey: string, ): Promise { - const users = await this.getUsersFromCache(); - - if (!users) return; - - const updatedUsers = users.map((user) => - addresses.includes(user.address) - ? { ...user, notificationSent: status } - : user, - ); - - await this.setUsersInCache(updatedUsers); - } + if (!addresses || addresses.length === 0) return; - async clearCache(): Promise { - await this.cacheService.delete(this.cacheKey); + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; + await this.cacheService.removeFromSet(redisKey, addresses); } } diff --git a/src/services/caching/cache.service.ts b/src/services/caching/cache.service.ts index 703e2289c..c028bda75 100644 --- a/src/services/caching/cache.service.ts +++ b/src/services/caching/cache.service.ts @@ -102,6 +102,20 @@ export class CacheService { return this.redisCacheService.keys(key); } + addToSet(key: string, members: string[]): Promise { + if (!members.length) return; + this.redisCacheService.sadd(key, ...members); + } + + removeFromSet(key: string, members: string[]): Promise { + if (!members.length) return; + return this.redisCacheService['redis'].srem(key, ...members); + } + + getSetMembers(key: string): Promise { + return this.redisCacheService.smembers(key); + } + getOrSetRemote( key: string, createValueFunc: () => Promise, diff --git a/src/services/elastic-search/elastic.search.module.ts b/src/services/elastic-search/elastic.search.module.ts index 2a3759f9d..64dbc982f 100644 --- a/src/services/elastic-search/elastic.search.module.ts +++ b/src/services/elastic-search/elastic.search.module.ts @@ -3,6 +3,7 @@ import { ESTransactionsService } from './services/es.transactions.service'; import { CommonAppModule } from 'src/common.app.module'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; import { ElasticSearchEventsService } from './services/es.events.service'; +import { ElasticAccountsEnergyService } from './services/es.accounts.energy.service'; @Module({ imports: [ @@ -10,7 +11,15 @@ import { ElasticSearchEventsService } from './services/es.events.service'; DynamicModuleUtils.getApiModule(), DynamicModuleUtils.getElasticModule(), ], - providers: [ESTransactionsService, ElasticSearchEventsService], - exports: [ESTransactionsService, ElasticSearchEventsService], + providers: [ + ESTransactionsService, + ElasticSearchEventsService, + ElasticAccountsEnergyService, + ], + exports: [ + ESTransactionsService, + ElasticSearchEventsService, + ElasticAccountsEnergyService, + ], }) export class ElasticSearchModule {} diff --git a/src/services/elastic-search/services/es.accounts.energy.service.ts b/src/services/elastic-search/services/es.accounts.energy.service.ts new file mode 100644 index 000000000..8078c1012 --- /dev/null +++ b/src/services/elastic-search/services/es.accounts.energy.service.ts @@ -0,0 +1,34 @@ +import { QueryType, ElasticService } from '@multiversx/sdk-nestjs-elastic'; + +import { ElasticQuery } from '@multiversx/sdk-nestjs-elastic'; +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class ElasticAccountsEnergyService { + constructor(private readonly elasticService: ElasticService) {} + + async getAccountsByEnergyAmount( + epoch: number, + operator: 'gt' | 'lt' | 'gte' | 'lte' = 'gt', + action: (items: any[]) => Promise, + amount = 0, + ): Promise { + const query = ElasticQuery.create() + .withPagination({ from: 0, size: 10000 }) + .withMustExistCondition('energyDetails'); + + query.condition.must = [ + QueryType.Range('energyDetails.amount', { + key: operator, + value: amount, + }), + ]; + + await this.elasticService.getScrollableList( + `accounts-000001_${epoch}`, + 'address', + query, + action, + ); + } +} diff --git a/src/services/multiversx-communication/mx.communication.module.ts b/src/services/multiversx-communication/mx.communication.module.ts index bc2f34fa6..42618e7af 100644 --- a/src/services/multiversx-communication/mx.communication.module.ts +++ b/src/services/multiversx-communication/mx.communication.module.ts @@ -4,6 +4,7 @@ import { MXApiService } from './mx.api.service'; import { MXDataApiService } from './mx.data.api.service'; import { MXGatewayService } from './mx.gateway.service'; import { MXProxyService } from './mx.proxy.service'; +import { XPortalApiService } from './mx.xportal.api.service'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; @Module({ @@ -13,6 +14,7 @@ import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; MXApiService, MXGatewayService, MXDataApiService, + XPortalApiService, ApiConfigService, ], exports: [ @@ -20,6 +22,7 @@ import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; MXApiService, MXGatewayService, MXDataApiService, + XPortalApiService, ApiConfigService, ], }) diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts new file mode 100644 index 000000000..31cc15af0 --- /dev/null +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -0,0 +1,66 @@ +import { Injectable } from '@nestjs/common'; +import { ApiConfigService } from '../../helpers/api.config.service'; +import axios from 'axios'; + +interface NotificationPayload { + addresses: string[]; + chainId: number; + title: string; + body: string; + route?: string; + iconUrl?: string; +} + +@Injectable() +export class XPortalApiService { + private readonly axiosInstance; + + constructor(private readonly apiConfigService: ApiConfigService) { + this.axiosInstance = axios.create({ + baseURL: this.apiConfigService.getNotificationsApiUrl(), + timeout: 30000, + headers: { + 'Content-Type': 'application/json', + }, + }); + } + + async sendPushNotifications( + addresses: string[], + title: string, + body: string, + route?: string, + iconUrl?: string, + ): Promise { + const chainId = Number(this.apiConfigService.getChainId()); + + const payload: NotificationPayload = { + addresses, + chainId, + title, + body, + route, + iconUrl, + }; + + try { + const response = await this.axiosInstance.post( + 'notifications-api/api/v1/dapps/push-notifications/send', + payload, + { + headers: { + 'x-notifications-api-key': + this.apiConfigService.getNotificationsApiKey(), + }, + }, + ); + + if (response.status === 201) { + return true; + } + return false; + } catch (error) { + return false; + } + } +} From c726d977eaddc570834f8baa615d5c406e3bc784 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 11 Apr 2025 10:07:07 +0300 Subject: [PATCH 04/22] MEX-748 Added push_notifications_aip_url in env.example --- .env.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.env.example b/.env.example index 64e4ebf1a..40788a57e 100644 --- a/.env.example +++ b/.env.example @@ -77,7 +77,7 @@ AWS_TIMESTREAM_READ=false AWS_TIMESTREAM_WRITE=false # Notification Service Configuration -PUSH_NOTIFICATIONS_API_URL= +PUSH_NOTIFICATIONS_API_URL=https://devnet-tools.xportal.com PUSH_NOTIFICATIONS_API_KEY= PUSH_NOTIFICATIONS_BATCH_SIZE=100 PUSH_NOTIFICATIONS_MAX_RETRIES=3 From 5701b7446deea6b15fcbf8eb1454cadbbd7b3201 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 11 Apr 2025 16:55:31 +0300 Subject: [PATCH 05/22] MEX-748 Moved options to config from env --- .env.example | 3 -- src/config/default.json | 4 +++ src/helpers/api.config.service.ts | 28 ------------------- .../services/push.notifications.service.ts | 2 +- .../mx.xportal.api.service.ts | 3 +- 5 files changed, 7 insertions(+), 33 deletions(-) diff --git a/.env.example b/.env.example index 40788a57e..13ca92707 100644 --- a/.env.example +++ b/.env.example @@ -79,6 +79,3 @@ AWS_TIMESTREAM_WRITE=false # Notification Service Configuration PUSH_NOTIFICATIONS_API_URL=https://devnet-tools.xportal.com PUSH_NOTIFICATIONS_API_KEY= -PUSH_NOTIFICATIONS_BATCH_SIZE=100 -PUSH_NOTIFICATIONS_MAX_RETRIES=3 -PUSH_NOTIFICATIONS_CHAIN_ID=508 # 508 for devnet diff --git a/src/config/default.json b/src/config/default.json index b59119693..b081aff17 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -702,6 +702,10 @@ } }, "pushNotifications": { + "options": { + "batchSize": 100, + "chainId": 508 + }, "feesCollectorRewards": { "title": "Energy Update", "body": "You can now claim your fees rewards", diff --git a/src/helpers/api.config.service.ts b/src/helpers/api.config.service.ts index 6b10fd839..80d800013 100644 --- a/src/helpers/api.config.service.ts +++ b/src/helpers/api.config.service.ts @@ -212,34 +212,6 @@ export class ApiConfigService { return apiKey; } - getNotificationsBatchSize(): number { - const batchSize = this.configService.get( - 'PUSH_NOTIFICATIONS_BATCH_SIZE', - ); - if (!batchSize) { - throw new Error('No push notifications batch size present'); - } - return batchSize; - } - - getNotificationsMaxRetries(): number { - const maxRetries = this.configService.get( - 'PUSH_NOTIFICATIONS_MAX_RETRIES', - ); - if (!maxRetries) { - throw new Error('No push notifications max retries present'); - } - return maxRetries; - } - - getChainId(): number { - const chainId = this.configService.get('PUSH_NOTIFICATIONS_CHAIN_ID'); - if (!chainId) { - throw new Error('No push notifications chainId present'); - } - return chainId; - } - getJwtSecret(): string { const secret = this.configService.get('JWT_SECRET'); if (!secret) { diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts index f142d86c9..199954978 100644 --- a/src/modules/push-notifications/services/push.notifications.service.ts +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -67,7 +67,7 @@ export class PushNotificationsService { notificationParams: NotificationConfig, notificationKey: string, ): Promise { - const batchSize = this.apiConfigService.getNotificationsBatchSize(); + const batchSize = pushNotificationsConfig.options.batchSize; const failed: string[] = []; const successful: string[] = []; diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts index 31cc15af0..b1120815e 100644 --- a/src/services/multiversx-communication/mx.xportal.api.service.ts +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -1,6 +1,7 @@ import { Injectable } from '@nestjs/common'; import { ApiConfigService } from '../../helpers/api.config.service'; import axios from 'axios'; +import { pushNotificationsConfig } from 'src/config'; interface NotificationPayload { addresses: string[]; @@ -32,7 +33,7 @@ export class XPortalApiService { route?: string, iconUrl?: string, ): Promise { - const chainId = Number(this.apiConfigService.getChainId()); + const chainId = pushNotificationsConfig.options.chainId; const payload: NotificationPayload = { addresses, From 9daa642d7b9a2bf477d19218c6f12f15b8a5237c Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Fri, 11 Apr 2025 17:30:37 +0300 Subject: [PATCH 06/22] MEX-748 Use ApiService instead of axios directly --- .../mx.communication.module.ts | 5 ++- .../mx.xportal.api.service.ts | 40 +++++++------------ 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/src/services/multiversx-communication/mx.communication.module.ts b/src/services/multiversx-communication/mx.communication.module.ts index 42618e7af..421381028 100644 --- a/src/services/multiversx-communication/mx.communication.module.ts +++ b/src/services/multiversx-communication/mx.communication.module.ts @@ -8,7 +8,10 @@ import { XPortalApiService } from './mx.xportal.api.service'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; @Module({ - imports: [DynamicModuleUtils.getCacheModule()], + imports: [ + DynamicModuleUtils.getCacheModule(), + DynamicModuleUtils.getApiModule(), + ], providers: [ MXProxyService, MXApiService, diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts index b1120815e..82d34fc02 100644 --- a/src/services/multiversx-communication/mx.xportal.api.service.ts +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ApiConfigService } from '../../helpers/api.config.service'; -import axios from 'axios'; +import { ApiService } from '@multiversx/sdk-nestjs-http'; import { pushNotificationsConfig } from 'src/config'; interface NotificationPayload { @@ -14,17 +14,10 @@ interface NotificationPayload { @Injectable() export class XPortalApiService { - private readonly axiosInstance; - - constructor(private readonly apiConfigService: ApiConfigService) { - this.axiosInstance = axios.create({ - baseURL: this.apiConfigService.getNotificationsApiUrl(), - timeout: 30000, - headers: { - 'Content-Type': 'application/json', - }, - }); - } + constructor( + private readonly apiConfigService: ApiConfigService, + private readonly apiService: ApiService, + ) {} async sendPushNotifications( addresses: string[], @@ -34,6 +27,9 @@ export class XPortalApiService { iconUrl?: string, ): Promise { const chainId = pushNotificationsConfig.options.chainId; + const baseUrl = this.apiConfigService.getNotificationsApiUrl(); + const apiKey = this.apiConfigService.getNotificationsApiKey(); + const url = `${baseUrl}/notifications-api/api/v1/dapps/push-notifications/send`; const payload: NotificationPayload = { addresses, @@ -44,23 +40,17 @@ export class XPortalApiService { iconUrl, }; + console.log(payload); try { - const response = await this.axiosInstance.post( - 'notifications-api/api/v1/dapps/push-notifications/send', - payload, - { - headers: { - 'x-notifications-api-key': - this.apiConfigService.getNotificationsApiKey(), - }, + const response = await this.apiService.post(url, payload, { + headers: { + 'x-notifications-api-key': apiKey, }, - ); + }); - if (response.status === 201) { - return true; - } - return false; + return response.status === 201; } catch (error) { + console.error(error); return false; } } From c9f750a8264e445fdf78152b302d4b4382e5c728 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 16 Apr 2025 14:14:31 +0300 Subject: [PATCH 07/22] MEX-748 Added Fixes after review --- src/config/default.json | 4 +- .../energy/services/energy.abi.service.ts | 30 +++++++- .../crons/push.notifications.energy.ts | 46 ++++++++---- .../models/push.notifications.types.ts | 12 --- .../services/push.notifications.service.ts | 74 ++++--------------- .../mx.xportal.api.service.ts | 35 +++------ 6 files changed, 87 insertions(+), 114 deletions(-) diff --git a/src/config/default.json b/src/config/default.json index b081aff17..b478d53ea 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -707,13 +707,13 @@ "chainId": 508 }, "feesCollectorRewards": { - "title": "Energy Update", + "title": "xExchange: Energy rewards", "body": "You can now claim your fees rewards", "route": "/portfolio", "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" }, "negativeEnergy": { - "title": "Energy Update", + "title": "xExchange: Update energy", "body": "You have negative energy", "route": "/energy", "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" diff --git a/src/modules/energy/services/energy.abi.service.ts b/src/modules/energy/services/energy.abi.service.ts index 0e97712a8..399696262 100644 --- a/src/modules/energy/services/energy.abi.service.ts +++ b/src/modules/energy/services/energy.abi.service.ts @@ -12,13 +12,13 @@ import { MXProxyService } from 'src/services/multiversx-communication/mx.proxy.s import { GenericAbiService } from 'src/services/generics/generic.abi.service'; import { LockOption } from '../models/simple.lock.energy.model'; import { GetOrSetCache } from 'src/helpers/decorators/caching.decorator'; -import { Constants } from '@multiversx/sdk-nestjs-common'; +import { AddressUtils, BinaryUtils } from '@multiversx/sdk-nestjs-common'; import { CacheTtlInfo } from 'src/services/caching/cache.ttl.info'; import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; import { scAddress } from 'src/config'; import { IEnergyAbiService } from './interfaces'; import { ErrorLoggerAsync } from '@multiversx/sdk-nestjs-common'; - +import { MXGatewayService } from 'src/services/multiversx-communication/mx.gateway.service'; @Injectable() export class EnergyAbiService extends GenericAbiService @@ -27,6 +27,7 @@ export class EnergyAbiService constructor( protected readonly mxProxy: MXProxyService, private readonly mxAPI: MXApiService, + private readonly mxGateway: MXGatewayService, ) { super(mxProxy); } @@ -216,4 +217,29 @@ export class EnergyAbiService return response.firstValue.valueOf(); } + + @ErrorLoggerAsync() + async getUsersWithEnergy(): Promise { + const contractAddress = scAddress.simpleLockEnergy; + + const contractKeysRaw = await this.mxGateway.getSCStorageKeys( + contractAddress, + [], + ); + + const contractPairs = Object.entries( + contractKeysRaw?.data?.pairs || {}, + ); + + const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); + const userEnergyKeys = contractPairs + .filter(([key, _]) => key.startsWith(userEnergyKey)) + .map(([key, _]) => key.replace(userEnergyKey, '')); + + const userEnergyAddresses = userEnergyKeys.map((key) => + AddressUtils.bech32Encode(key), + ); + + return userEnergyAddresses; + } } diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 640f0659d..4965fb5b3 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -1,40 +1,52 @@ -import { Injectable } from '@nestjs/common'; -import { Cron } from '@nestjs/schedule'; -import { Lock } from '@multiversx/sdk-nestjs-common'; -import { PushNotificationsService } from '../services/push.notifications.service'; import { AccountType, NotificationType, } from '../models/push.notifications.types'; -import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; +import { Injectable } from '@nestjs/common'; +import { Lock } from '@multiversx/sdk-nestjs-common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { PushNotificationsService } from '../services/push.notifications.service'; +import { EnergyAbiService } from 'src/modules/energy/services/energy.abi.service'; import { ContextGetterService } from 'src/services/context/context.getter.service'; +import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; +import { pushNotificationsConfig } from 'src/config'; @Injectable() export class PushNotificationsEnergyCron { constructor( - private readonly pushNotificationsService: PushNotificationsService, + private readonly energyAbiService: EnergyAbiService, private readonly contextGetter: ContextGetterService, + private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, ) {} - @Cron('0 0 19 * * 5') // Every Friday at 19:00 + @Cron(CronExpression.EVERY_HOUR) @Lock({ name: 'feesCollectorRewardsCron', verbose: true }) async feesCollectorRewardsCron() { + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + const feesCollector = pushNotificationsConfig.feesCollector; + + if ((currentEpoch - feesCollector.firstWeekStartEpoch) % 7 !== 0) { + return; + } + const isDevnet = process.env.NODE_ENV === 'devnet'; + console.log('isDevnet', isDevnet); if (isDevnet) { - const addresses = - await this.pushNotificationsService.usersWithEnergyFromContractStorage(); + console.log('Sending notifications for devnet'); + const addresses = await this.energyAbiService.getUsersWithEnergy(); - await this.pushNotificationsService.sendNotifications( + await this.pushNotificationsService.sendNotificationsInBatches( addresses, + pushNotificationsConfig[ + NotificationType.FEES_COLLECTOR_REWARDS + ], NotificationType.FEES_COLLECTOR_REWARDS, ); return; } - const currentEpoch = await this.contextGetter.getCurrentEpoch(); - await this.accountsEnergyElasticService.getAccountsByEnergyAmount( currentEpoch - 1, 'gt', @@ -43,8 +55,11 @@ export class PushNotificationsEnergyCron { (item: AccountType) => item.address, ); - await this.pushNotificationsService.sendNotifications( + await this.pushNotificationsService.sendNotificationsInBatches( addresses, + pushNotificationsConfig[ + NotificationType.FEES_COLLECTOR_REWARDS + ], NotificationType.FEES_COLLECTOR_REWARDS, ); }, @@ -64,8 +79,9 @@ export class PushNotificationsEnergyCron { (item: AccountType) => item.address, ); - await this.pushNotificationsService.sendNotifications( + await this.pushNotificationsService.sendNotificationsInBatches( addresses, + pushNotificationsConfig[NotificationType.NEGATIVE_ENERGY], NotificationType.NEGATIVE_ENERGY, ); }, @@ -73,7 +89,7 @@ export class PushNotificationsEnergyCron { ); } - @Cron('*/1 * * * *') // Every 10 minutes + @Cron(CronExpression.EVERY_10_MINUTES) @Lock({ name: 'retryFailedNotificationsCron', verbose: true }) async retryFailedNotificationsCron() { const notificationTypes = Object.values(NotificationType); diff --git a/src/modules/push-notifications/models/push.notifications.types.ts b/src/modules/push-notifications/models/push.notifications.types.ts index 031b454e7..d92c22b6c 100644 --- a/src/modules/push-notifications/models/push.notifications.types.ts +++ b/src/modules/push-notifications/models/push.notifications.types.ts @@ -20,18 +20,6 @@ export interface AccountType { totalUnDelegate: string; } -export interface ContractKeysRaw { - data: { - blockInfo: { - hash: string; - nonce: number; - rootHash: string; - }; - pairs: Record; - }; - code: string; -} - export interface NotificationResult { successful: string[]; failed: string[]; diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts index 199954978..5bb8fb004 100644 --- a/src/modules/push-notifications/services/push.notifications.service.ts +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -1,73 +1,27 @@ -import { Injectable } from '@nestjs/common'; -import { pushNotificationsConfig, scAddress } from 'src/config'; -import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; -import { ApiConfigService } from 'src/helpers/api.config.service'; -import { - AddressUtils, - BinaryUtils, - ErrorLoggerAsync, -} from '@multiversx/sdk-nestjs-common'; - import { - ContractKeysRaw, NotificationConfig, NotificationResult, NotificationType, } from '../models/push.notifications.types'; +import { Injectable } from '@nestjs/common'; +import { pushNotificationsConfig } from 'src/config'; import { PushNotificationsSetterService } from './push.notifications.setter.service'; import { XPortalApiService } from 'src/services/multiversx-communication/mx.xportal.api.service'; @Injectable() export class PushNotificationsService { constructor( - private readonly apiService: MXApiService, - private readonly apiConfigService: ApiConfigService, - private readonly notificationsSetter: PushNotificationsSetterService, private readonly xPortalApiService: XPortalApiService, + private readonly notificationsSetter: PushNotificationsSetterService, ) {} - @ErrorLoggerAsync() - async usersWithEnergyFromContractStorage(): Promise { - const contractAddress = scAddress.simpleLockEnergy; - const contractKeysRaw: ContractKeysRaw = - await this.apiService.doGetGeneric( - 'getContractKeys', - `address/${contractAddress}/keys`, - ); - - const contractPairs = Object.entries( - contractKeysRaw?.data?.pairs || {}, - ); - - const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); - const userEnergyKeys = contractPairs - .filter(([key, _]) => key.startsWith(userEnergyKey)) - .map(([key, _]) => key.replace(userEnergyKey, '')); - - const userEnergyAddresses = userEnergyKeys.map((key) => - AddressUtils.bech32Encode(key), - ); - - return userEnergyAddresses; - } - - async sendNotifications( - addresses: string[], - notificationType: NotificationType, - ): Promise { - return this.sendNotificationsInBatches( - addresses, - pushNotificationsConfig[notificationType], - notificationType, - ); - } - - private async sendNotificationsInBatches( + async sendNotificationsInBatches( addresses: string[], notificationParams: NotificationConfig, notificationKey: string, ): Promise { const batchSize = pushNotificationsConfig.options.batchSize; + const chainId = pushNotificationsConfig.options.chainId; const failed: string[] = []; const successful: string[] = []; @@ -75,13 +29,14 @@ export class PushNotificationsService { const batch = addresses.slice(i, i + batchSize); try { const success = - await this.xPortalApiService.sendPushNotifications( - batch, - notificationParams.title, - notificationParams.body, - notificationParams.route, - notificationParams.iconUrl, - ); + await this.xPortalApiService.sendPushNotifications({ + addresses: batch, + chainId, + title: notificationParams.title, + body: notificationParams.body, + route: notificationParams.route, + iconUrl: notificationParams.iconUrl, + }); if (success) { successful.push(...batch); @@ -115,8 +70,9 @@ export class PushNotificationsService { return; } - const { successful } = await this.sendNotifications( + const { successful } = await this.sendNotificationsInBatches( failedAddresses, + pushNotificationsConfig[notificationType], notificationType, ); diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts index 82d34fc02..92d55655d 100644 --- a/src/services/multiversx-communication/mx.xportal.api.service.ts +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -1,16 +1,15 @@ import { Injectable } from '@nestjs/common'; import { ApiConfigService } from '../../helpers/api.config.service'; import { ApiService } from '@multiversx/sdk-nestjs-http'; -import { pushNotificationsConfig } from 'src/config'; -interface NotificationPayload { +type NotificationPayload = { addresses: string[]; chainId: number; title: string; body: string; route?: string; iconUrl?: string; -} +}; @Injectable() export class XPortalApiService { @@ -20,37 +19,25 @@ export class XPortalApiService { ) {} async sendPushNotifications( - addresses: string[], - title: string, - body: string, - route?: string, - iconUrl?: string, + notificationPayload: NotificationPayload, ): Promise { - const chainId = pushNotificationsConfig.options.chainId; const baseUrl = this.apiConfigService.getNotificationsApiUrl(); const apiKey = this.apiConfigService.getNotificationsApiKey(); const url = `${baseUrl}/notifications-api/api/v1/dapps/push-notifications/send`; - const payload: NotificationPayload = { - addresses, - chainId, - title, - body, - route, - iconUrl, - }; - - console.log(payload); try { - const response = await this.apiService.post(url, payload, { - headers: { - 'x-notifications-api-key': apiKey, + const response = await this.apiService.post( + url, + notificationPayload, + { + headers: { + 'x-notifications-api-key': apiKey, + }, }, - }); + ); return response.status === 201; } catch (error) { - console.error(error); return false; } } From 7454f00a76d3be2b9a93017748af20c3de2ef327 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 16 Apr 2025 16:08:02 +0300 Subject: [PATCH 08/22] MEX-748 Fix feesCollector startEpoch logic --- src/modules/energy/services/energy.abi.service.ts | 4 +--- .../crons/push.notifications.energy.ts | 13 ++++++++----- .../push-notifications/push.notifications.module.ts | 2 ++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/modules/energy/services/energy.abi.service.ts b/src/modules/energy/services/energy.abi.service.ts index 399696262..d55aa8ea4 100644 --- a/src/modules/energy/services/energy.abi.service.ts +++ b/src/modules/energy/services/energy.abi.service.ts @@ -227,9 +227,7 @@ export class EnergyAbiService [], ); - const contractPairs = Object.entries( - contractKeysRaw?.data?.pairs || {}, - ); + const contractPairs = Object.entries(contractKeysRaw); const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); const userEnergyKeys = contractPairs diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 4965fb5b3..90c8555fd 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -9,8 +9,8 @@ import { PushNotificationsService } from '../services/push.notifications.service import { EnergyAbiService } from 'src/modules/energy/services/energy.abi.service'; import { ContextGetterService } from 'src/services/context/context.getter.service'; import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; -import { pushNotificationsConfig } from 'src/config'; - +import { pushNotificationsConfig, scAddress } from 'src/config'; +import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/services/week-timekeeping.abi.service'; @Injectable() export class PushNotificationsEnergyCron { constructor( @@ -18,20 +18,23 @@ export class PushNotificationsEnergyCron { private readonly contextGetter: ContextGetterService, private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, + private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, ) {} @Cron(CronExpression.EVERY_HOUR) @Lock({ name: 'feesCollectorRewardsCron', verbose: true }) async feesCollectorRewardsCron() { const currentEpoch = await this.contextGetter.getCurrentEpoch(); - const feesCollector = pushNotificationsConfig.feesCollector; + const firstWeekStartEpoch = + await this.weekTimekeepingAbi.firstWeekStartEpoch( + String(scAddress.feesCollector), + ); - if ((currentEpoch - feesCollector.firstWeekStartEpoch) % 7 !== 0) { + if ((currentEpoch - firstWeekStartEpoch) % 7 !== 0) { return; } const isDevnet = process.env.NODE_ENV === 'devnet'; - console.log('isDevnet', isDevnet); if (isDevnet) { console.log('Sending notifications for devnet'); diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts index 7001a8786..83fb752fc 100644 --- a/src/modules/push-notifications/push.notifications.module.ts +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -9,6 +9,7 @@ import { ContextModule } from '../../services/context/context.module'; import { CacheModule } from '../../services/caching/cache.module'; import { EnergyModule } from '../energy/energy.module'; import { PushNotificationsEnergyCron } from './crons/push.notifications.energy'; +import { WeekTimekeepingModule } from 'src/submodules/week-timekeeping/week-timekeeping.module'; @Module({ imports: [ CommonAppModule, @@ -18,6 +19,7 @@ import { PushNotificationsEnergyCron } from './crons/push.notifications.energy'; ContextModule, CacheModule, EnergyModule, + WeekTimekeepingModule, ], providers: [ PushNotificationsService, From 49f1ca48b7ebb53d3a6e3605e688187a8a8023a4 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Thu, 17 Apr 2025 13:47:50 +0300 Subject: [PATCH 09/22] MEX-748 Fix Cron interval for feesCollector Rewards Notifications --- .../push-notifications/crons/push.notifications.energy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 90c8555fd..234c839ac 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -21,7 +21,7 @@ export class PushNotificationsEnergyCron { private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, ) {} - @Cron(CronExpression.EVERY_HOUR) + @Cron(CronExpression.EVERY_DAY_AT_NOON) @Lock({ name: 'feesCollectorRewardsCron', verbose: true }) async feesCollectorRewardsCron() { const currentEpoch = await this.contextGetter.getCurrentEpoch(); From ff38557431b67d538c57366f7f74e9218791e75b Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 30 Apr 2025 10:17:52 +0300 Subject: [PATCH 10/22] MEX-748 Added Logs to PushNotification Service --- .../crons/push.notifications.energy.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 234c839ac..eb2aecfb6 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -2,7 +2,7 @@ import { AccountType, NotificationType, } from '../models/push.notifications.types'; -import { Injectable } from '@nestjs/common'; +import { Injectable, Inject } from '@nestjs/common'; import { Lock } from '@multiversx/sdk-nestjs-common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PushNotificationsService } from '../services/push.notifications.service'; @@ -11,6 +11,9 @@ import { ContextGetterService } from 'src/services/context/context.getter.servic import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; import { pushNotificationsConfig, scAddress } from 'src/config'; import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/services/week-timekeeping.abi.service'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; + @Injectable() export class PushNotificationsEnergyCron { constructor( @@ -19,6 +22,7 @@ export class PushNotificationsEnergyCron { private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} @Cron(CronExpression.EVERY_DAY_AT_NOON) @@ -37,7 +41,7 @@ export class PushNotificationsEnergyCron { const isDevnet = process.env.NODE_ENV === 'devnet'; if (isDevnet) { - console.log('Sending notifications for devnet'); + this.logger.log('Sending notifications for devnet', 'PushNotificationsEnergyCron'); const addresses = await this.energyAbiService.getUsersWithEnergy(); await this.pushNotificationsService.sendNotificationsInBatches( @@ -47,6 +51,7 @@ export class PushNotificationsEnergyCron { ], NotificationType.FEES_COLLECTOR_REWARDS, ); + this.logger.log(`Sent ${addresses.length} notifications for devnet`, 'PushNotificationsEnergyCron'); return; } @@ -65,6 +70,7 @@ export class PushNotificationsEnergyCron { ], NotificationType.FEES_COLLECTOR_REWARDS, ); + this.logger.log(`Sent ${addresses.length} notifications for mainnet`, 'PushNotificationsEnergyCron'); }, ); } @@ -87,6 +93,7 @@ export class PushNotificationsEnergyCron { pushNotificationsConfig[NotificationType.NEGATIVE_ENERGY], NotificationType.NEGATIVE_ENERGY, ); + this.logger.log(`Sent ${addresses.length} negative energy notifications`, 'PushNotificationsEnergyCron'); }, 0, ); From 7084897df22af4d072ce6ab7df5e4aaa457d2004 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 30 Apr 2025 10:39:17 +0300 Subject: [PATCH 11/22] MEX-748 Added try catch block for the sendPushNotifications --- .../services/push.notifications.service.ts | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts index 5bb8fb004..6a059f259 100644 --- a/src/modules/push-notifications/services/push.notifications.service.ts +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -7,9 +7,12 @@ import { Injectable } from '@nestjs/common'; import { pushNotificationsConfig } from 'src/config'; import { PushNotificationsSetterService } from './push.notifications.setter.service'; import { XPortalApiService } from 'src/services/multiversx-communication/mx.xportal.api.service'; +import { Logger } from '@nestjs/common'; @Injectable() export class PushNotificationsService { + private readonly logger = new Logger(PushNotificationsService.name); + constructor( private readonly xPortalApiService: XPortalApiService, private readonly notificationsSetter: PushNotificationsSetterService, @@ -25,37 +28,45 @@ export class PushNotificationsService { const failed: string[] = []; const successful: string[] = []; - for (let i = 0; i < addresses.length; i += batchSize) { - const batch = addresses.slice(i, i + batchSize); - try { - const success = - await this.xPortalApiService.sendPushNotifications({ - addresses: batch, - chainId, - title: notificationParams.title, - body: notificationParams.body, - route: notificationParams.route, - iconUrl: notificationParams.iconUrl, - }); + try { + for (let i = 0; i < addresses.length; i += batchSize) { + const batch = addresses.slice(i, i + batchSize); + try { + const success = + await this.xPortalApiService.sendPushNotifications({ + addresses: batch, + chainId, + title: notificationParams.title, + body: notificationParams.body, + route: notificationParams.route, + iconUrl: notificationParams.iconUrl, + }); - if (success) { - successful.push(...batch); - } else { + if (success) { + successful.push(...batch); + } else { + failed.push(...batch); + } + } catch (error) { failed.push(...batch); } - } catch (error) { - failed.push(...batch); } - } - if (failed.length > 0) { - await this.notificationsSetter.addFailedNotifications( - failed, - notificationKey, + if (failed.length > 0) { + await this.notificationsSetter.addFailedNotifications( + failed, + notificationKey, + ); + } + + return { successful, failed }; + } catch (error) { + this.logger.error( + `Error sending notifications: ${error.message}`, + 'PushNotificationsService', ); + throw error; } - - return { successful, failed }; } async retryFailedNotifications( From c534c4470c364ec93e0a30993c7a6a530bf3feeb Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 30 Apr 2025 18:23:20 +0300 Subject: [PATCH 12/22] MEX-748 Implement RedLock for Push Notifications as a decorator --- .env.example | 7 ++ src/config/default.json | 3 +- src/helpers/api.config.service.ts | 23 ++++++ .../crons/push.notifications.energy.ts | 27 ++++--- .../decorators/lock.retry.decorator.ts | 43 ++++++++++++ .../push.notifications.module.ts | 3 + .../utils/lock.retry.utils.ts | 70 +++++++++++++++++++ src/utils/dynamic.module.utils.ts | 34 ++++++++- 8 files changed, 200 insertions(+), 10 deletions(-) create mode 100644 src/modules/push-notifications/decorators/lock.retry.decorator.ts create mode 100644 src/modules/push-notifications/utils/lock.retry.utils.ts diff --git a/.env.example b/.env.example index 13ca92707..d1422e715 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,13 @@ REDIS_PASSWORD="" REDIS_PORT=6379 REDIS_DB=0 +#Redis common +REDIS_COMMON_URL="localhost" +REDIS_COMMON_PREFIX="development_env" +REDIS_COMMON_PASSWORD="" +REDIS_COMMON_PORT=6384 +REDIS_COMMON_DB=0 + #MongoDB MONGODB_URL="mongodb://localhost:27017" MONGODB_DATABASE="development" diff --git a/src/config/default.json b/src/config/default.json index b478d53ea..5ecdb372e 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -655,7 +655,8 @@ "VOLUME_WEIGHT": 0.25, "TRADES_COUNT_WEIGHT": 0.25 }, - "AWS_QUERY_CACHE_WARMER_DELAY": 50 + "AWS_QUERY_CACHE_WARMER_DELAY": 50, + "TIMESCALEDB_INSERT_CHUNK_SIZE": 30 }, "dataApi": { "tableName": "XEXCHANGE_ANALYTICS" diff --git a/src/helpers/api.config.service.ts b/src/helpers/api.config.service.ts index 80d800013..bbcd2b3f9 100644 --- a/src/helpers/api.config.service.ts +++ b/src/helpers/api.config.service.ts @@ -124,6 +124,29 @@ export class ApiConfigService { return password !== '' ? password : undefined; } + getCommonRedisUrl(): string { + const redisUrl = this.configService.get('REDIS_COMMON_URL'); + if (!redisUrl) { + throw new Error('No common redis url present'); + } + return redisUrl; + } + + getCommonRedisPort(): number { + const redisPort = this.configService.get('REDIS_COMMON_PORT'); + if (!redisPort) { + throw new Error('No common redis port present'); + } + return redisPort; + } + + getCommonRedisPassword(): string | undefined { + const password = this.configService.get( + 'REDIS_COMMON_PASSWORD', + ); + return password !== '' ? password : undefined; + } + getApiUrl(): string { const apiUrl = this.configService.get('MX_API_URL'); if (!apiUrl) { diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index eb2aecfb6..0da6d40e9 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -3,7 +3,6 @@ import { NotificationType, } from '../models/push.notifications.types'; import { Injectable, Inject } from '@nestjs/common'; -import { Lock } from '@multiversx/sdk-nestjs-common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PushNotificationsService } from '../services/push.notifications.service'; import { EnergyAbiService } from 'src/modules/energy/services/energy.abi.service'; @@ -13,6 +12,8 @@ import { pushNotificationsConfig, scAddress } from 'src/config'; import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/services/week-timekeeping.abi.service'; import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { Logger } from 'winston'; +import { RedlockService } from '@multiversx/sdk-nestjs-cache'; +import { LockAndRetry } from '../decorators/lock.retry.decorator'; @Injectable() export class PushNotificationsEnergyCron { @@ -22,11 +23,15 @@ export class PushNotificationsEnergyCron { private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, + private readonly redLockService: RedlockService, @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} @Cron(CronExpression.EVERY_DAY_AT_NOON) - @Lock({ name: 'feesCollectorRewardsCron', verbose: true }) + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'feesCollector', + }) async feesCollectorRewardsCron() { const currentEpoch = await this.contextGetter.getCurrentEpoch(); const firstWeekStartEpoch = @@ -41,7 +46,6 @@ export class PushNotificationsEnergyCron { const isDevnet = process.env.NODE_ENV === 'devnet'; if (isDevnet) { - this.logger.log('Sending notifications for devnet', 'PushNotificationsEnergyCron'); const addresses = await this.energyAbiService.getUsersWithEnergy(); await this.pushNotificationsService.sendNotificationsInBatches( @@ -51,7 +55,6 @@ export class PushNotificationsEnergyCron { ], NotificationType.FEES_COLLECTOR_REWARDS, ); - this.logger.log(`Sent ${addresses.length} notifications for devnet`, 'PushNotificationsEnergyCron'); return; } @@ -70,13 +73,15 @@ export class PushNotificationsEnergyCron { ], NotificationType.FEES_COLLECTOR_REWARDS, ); - this.logger.log(`Sent ${addresses.length} notifications for mainnet`, 'PushNotificationsEnergyCron'); }, ); } @Cron('0 12 */2 * *') // Every 2 days at noon - @Lock({ name: 'negativeEnergyNotificationsCron', verbose: true }) + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'negativeEnergy', + }) async negativeEnergyNotificationsCron() { const currentEpoch = await this.contextGetter.getCurrentEpoch(); @@ -93,14 +98,20 @@ export class PushNotificationsEnergyCron { pushNotificationsConfig[NotificationType.NEGATIVE_ENERGY], NotificationType.NEGATIVE_ENERGY, ); - this.logger.log(`Sent ${addresses.length} negative energy notifications`, 'PushNotificationsEnergyCron'); + this.logger.log( + `Sent ${addresses.length} negative energy notifications`, + 'PushNotificationsEnergyCron', + ); }, 0, ); } @Cron(CronExpression.EVERY_10_MINUTES) - @Lock({ name: 'retryFailedNotificationsCron', verbose: true }) + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'retryFailed', + }) async retryFailedNotificationsCron() { const notificationTypes = Object.values(NotificationType); diff --git a/src/modules/push-notifications/decorators/lock.retry.decorator.ts b/src/modules/push-notifications/decorators/lock.retry.decorator.ts new file mode 100644 index 000000000..bb64be9c3 --- /dev/null +++ b/src/modules/push-notifications/decorators/lock.retry.decorator.ts @@ -0,0 +1,43 @@ +import { RedlockService } from '@multiversx/sdk-nestjs-cache'; +import { Logger } from 'winston'; +import { withLockAndRetry } from '../utils/lock.retry.utils'; + +export function LockAndRetry(options: { + lockKey: string; + lockName: string; + keyExpiration?: number; + maxLockRetries?: number; + lockRetryInterval?: number; + maxOperationRetries?: number; + operationRetryInterval?: number; +}) { + return function ( + target: any, + propertyKey: string, + descriptor: PropertyDescriptor, + ) { + const originalMethod = descriptor.value; + + descriptor.value = async function (...args: any[]) { + const redLockService = this.redLockService as RedlockService; + const logger = this.logger as Logger; + + if (!redLockService || !logger) { + throw new Error( + 'Class must have redLockService and logger properties', + ); + } + + await withLockAndRetry( + redLockService, + logger, + options, + async () => { + await originalMethod.apply(this, args); + }, + ); + }; + + return descriptor; + }; +} diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts index 83fb752fc..c3be4e386 100644 --- a/src/modules/push-notifications/push.notifications.module.ts +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -10,6 +10,8 @@ import { CacheModule } from '../../services/caching/cache.module'; import { EnergyModule } from '../energy/energy.module'; import { PushNotificationsEnergyCron } from './crons/push.notifications.energy'; import { WeekTimekeepingModule } from 'src/submodules/week-timekeeping/week-timekeeping.module'; +import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; + @Module({ imports: [ CommonAppModule, @@ -20,6 +22,7 @@ import { WeekTimekeepingModule } from 'src/submodules/week-timekeeping/week-time CacheModule, EnergyModule, WeekTimekeepingModule, + DynamicModuleUtils.getRedlockModule(), ], providers: [ PushNotificationsService, diff --git a/src/modules/push-notifications/utils/lock.retry.utils.ts b/src/modules/push-notifications/utils/lock.retry.utils.ts new file mode 100644 index 000000000..3bf53f4b3 --- /dev/null +++ b/src/modules/push-notifications/utils/lock.retry.utils.ts @@ -0,0 +1,70 @@ +import { RedlockService } from '@multiversx/sdk-nestjs-cache'; +import { Logger } from 'winston'; + +interface LockRetryOptions { + lockKey: string; + lockName: string; + keyExpiration?: number; + maxLockRetries?: number; + lockRetryInterval?: number; + maxOperationRetries?: number; + operationRetryInterval?: number; +} + +export async function withLockAndRetry( + redLockService: RedlockService, + logger: Logger, + options: LockRetryOptions, + operation: () => Promise, +): Promise { + const { + lockKey, + lockName, + keyExpiration = 60 * 60 * 1000, // Default 1 hour + maxLockRetries = 5, + lockRetryInterval = 5000, + maxOperationRetries = 3, + operationRetryInterval = 5000, + } = options; + + const lock = await redLockService.lock(lockKey, lockName, { + keyExpiration, + maxRetries: maxLockRetries, + retryInterval: lockRetryInterval, + }); + + if (!lock) { + logger.warn(`Failed to acquire lock for ${lockName}`, 'LockRetryUtils'); + return; + } + + let retryCount = 0; + let success = false; + + while (!success && retryCount < maxOperationRetries) { + try { + await operation(); + success = true; + } catch (error) { + retryCount++; + logger.error( + `Operation ${lockName} attempt ${retryCount} failed: ${error.message}`, + 'LockRetryUtils', + ); + if (retryCount < maxOperationRetries) { + logger.info( + `Retrying operation ${lockName} (${retryCount}/${maxOperationRetries})`, + 'LockRetryUtils', + ); + await new Promise((resolve) => setTimeout(resolve, operationRetryInterval)); + } + } + } + + if (!success) { + logger.error( + `All retry attempts failed for operation ${lockName}`, + 'LockRetryUtils', + ); + } +} \ No newline at end of file diff --git a/src/utils/dynamic.module.utils.ts b/src/utils/dynamic.module.utils.ts index 99e540fab..b8afefb91 100644 --- a/src/utils/dynamic.module.utils.ts +++ b/src/utils/dynamic.module.utils.ts @@ -1,4 +1,9 @@ -import { RedisCacheModuleOptions } from '@multiversx/sdk-nestjs-cache'; +import { + RedisCacheModule, + RedisCacheModuleOptions, + RedlockConnectionOptions, + RedlockModule, +} from '@multiversx/sdk-nestjs-cache'; import { ElasticModule, ElasticModuleOptions, @@ -25,6 +30,33 @@ export class DynamicModuleUtils { }); } + static getCommonRedisModule(): DynamicModule { + return RedisCacheModule.forRootAsync({ + imports: [CommonAppModule], + useFactory: (configService: ApiConfigService) => + new RedisCacheModuleOptions({ + host: configService.getCommonRedisUrl(), + port: configService.getCommonRedisPort(), + password: configService.getCommonRedisPassword(), + }), + inject: [ApiConfigService], + }); + } + + static getRedlockModule(): DynamicModule { + return RedlockModule.forRootAsync({ + imports: [CommonAppModule], + useFactory: (configService: ApiConfigService) => [ + new RedlockConnectionOptions({ + host: configService.getCommonRedisUrl(), + port: configService.getCommonRedisPort(), + password: configService.getCommonRedisPassword(), + }), + ], + inject: [ApiConfigService], + }); + } + static getElasticModule(): DynamicModule { return ElasticModule.forRootAsync({ imports: [CommonAppModule], From 3f19dad2031248cf719734fc90f25e59d05a3d18 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 09:37:50 +0300 Subject: [PATCH 13/22] MEX-748 Added Logs back --- .../crons/push.notifications.energy.ts | 72 +++++++++++++------ 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 0da6d40e9..7077eee0e 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -43,18 +43,25 @@ export class PushNotificationsEnergyCron { return; } + let successfulNotifications = 0; + let failedNotifications = 0; + const isDevnet = process.env.NODE_ENV === 'devnet'; if (isDevnet) { const addresses = await this.energyAbiService.getUsersWithEnergy(); - await this.pushNotificationsService.sendNotificationsInBatches( - addresses, - pushNotificationsConfig[ - NotificationType.FEES_COLLECTOR_REWARDS - ], - NotificationType.FEES_COLLECTOR_REWARDS, - ); + const result = + await this.pushNotificationsService.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[ + NotificationType.FEES_COLLECTOR_REWARDS + ], + NotificationType.FEES_COLLECTOR_REWARDS, + ); + + successfulNotifications += result.successful.length; + failedNotifications += result.failed.length; return; } @@ -66,15 +73,24 @@ export class PushNotificationsEnergyCron { (item: AccountType) => item.address, ); - await this.pushNotificationsService.sendNotificationsInBatches( - addresses, - pushNotificationsConfig[ - NotificationType.FEES_COLLECTOR_REWARDS - ], - NotificationType.FEES_COLLECTOR_REWARDS, - ); + const result = + await this.pushNotificationsService.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[ + NotificationType.FEES_COLLECTOR_REWARDS + ], + NotificationType.FEES_COLLECTOR_REWARDS, + ); + + successfulNotifications += result.successful.length; + failedNotifications += result.failed.length; }, ); + + this.logger.log( + `Fees collector rewards cron completed. Successful: ${successfulNotifications}, Failed: ${failedNotifications}`, + 'PushNotificationsEnergyCron', + ); } @Cron('0 12 */2 * *') // Every 2 days at noon @@ -85,6 +101,9 @@ export class PushNotificationsEnergyCron { async negativeEnergyNotificationsCron() { const currentEpoch = await this.contextGetter.getCurrentEpoch(); + let successfulNotifications = 0; + let failedNotifications = 0; + await this.accountsEnergyElasticService.getAccountsByEnergyAmount( currentEpoch - 1, 'lt', @@ -93,18 +112,25 @@ export class PushNotificationsEnergyCron { (item: AccountType) => item.address, ); - await this.pushNotificationsService.sendNotificationsInBatches( - addresses, - pushNotificationsConfig[NotificationType.NEGATIVE_ENERGY], - NotificationType.NEGATIVE_ENERGY, - ); - this.logger.log( - `Sent ${addresses.length} negative energy notifications`, - 'PushNotificationsEnergyCron', - ); + const result = + await this.pushNotificationsService.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[ + NotificationType.NEGATIVE_ENERGY + ], + NotificationType.NEGATIVE_ENERGY, + ); + + successfulNotifications += result.successful.length; + failedNotifications += result.failed.length; }, 0, ); + + this.logger.log( + `Negative energy notifications cron completed. Successful: ${successfulNotifications}, Failed: ${failedNotifications}`, + 'PushNotificationsEnergyCron', + ); } @Cron(CronExpression.EVERY_10_MINUTES) From 5f3c8f121fad785080e66554b10586e7d5067f70 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 09:46:57 +0300 Subject: [PATCH 14/22] MEX-748 Added error logging on sendPushNotifications --- .../multiversx-communication/mx.xportal.api.service.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts index 92d55655d..eb6f3c2d2 100644 --- a/src/services/multiversx-communication/mx.xportal.api.service.ts +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { ApiConfigService } from '../../helpers/api.config.service'; import { ApiService } from '@multiversx/sdk-nestjs-http'; @@ -13,6 +13,8 @@ type NotificationPayload = { @Injectable() export class XPortalApiService { + private readonly logger = new Logger(XPortalApiService.name); + constructor( private readonly apiConfigService: ApiConfigService, private readonly apiService: ApiService, @@ -38,6 +40,10 @@ export class XPortalApiService { return response.status === 201; } catch (error) { + this.logger.error( + `Error sending push notification: ${error.message}`, + 'XPortalApiService', + ); return false; } } From 6ea6bdd4984048f335037b90748b542977a65da6 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 09:47:46 +0300 Subject: [PATCH 15/22] MEX-748 Moved withLockAndRetry to generic utils --- .../push-notifications/decorators/lock.retry.decorator.ts | 2 +- src/{modules/push-notifications => }/utils/lock.retry.utils.ts | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/{modules/push-notifications => }/utils/lock.retry.utils.ts (100%) diff --git a/src/modules/push-notifications/decorators/lock.retry.decorator.ts b/src/modules/push-notifications/decorators/lock.retry.decorator.ts index bb64be9c3..4ac944b74 100644 --- a/src/modules/push-notifications/decorators/lock.retry.decorator.ts +++ b/src/modules/push-notifications/decorators/lock.retry.decorator.ts @@ -1,6 +1,6 @@ import { RedlockService } from '@multiversx/sdk-nestjs-cache'; +import { withLockAndRetry } from 'src/utils/lock.retry.utils'; import { Logger } from 'winston'; -import { withLockAndRetry } from '../utils/lock.retry.utils'; export function LockAndRetry(options: { lockKey: string; diff --git a/src/modules/push-notifications/utils/lock.retry.utils.ts b/src/utils/lock.retry.utils.ts similarity index 100% rename from src/modules/push-notifications/utils/lock.retry.utils.ts rename to src/utils/lock.retry.utils.ts From 171b388d5d57f32af875a4b9e108a2b00eade0ef Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 10:14:50 +0300 Subject: [PATCH 16/22] MEX-748: Removed redundant try/catch blocks --- .../services/push.notifications.service.ts | 55 +++++++------------ 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts index 6a059f259..3060d53c5 100644 --- a/src/modules/push-notifications/services/push.notifications.service.ts +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -28,45 +28,32 @@ export class PushNotificationsService { const failed: string[] = []; const successful: string[] = []; - try { - for (let i = 0; i < addresses.length; i += batchSize) { - const batch = addresses.slice(i, i + batchSize); - try { - const success = - await this.xPortalApiService.sendPushNotifications({ - addresses: batch, - chainId, - title: notificationParams.title, - body: notificationParams.body, - route: notificationParams.route, - iconUrl: notificationParams.iconUrl, - }); + for (let i = 0; i < addresses.length; i += batchSize) { + const batch = addresses.slice(i, i + batchSize); + const success = await this.xPortalApiService.sendPushNotifications({ + addresses: batch, + chainId, + title: notificationParams.title, + body: notificationParams.body, + route: notificationParams.route, + iconUrl: notificationParams.iconUrl, + }); - if (success) { - successful.push(...batch); - } else { - failed.push(...batch); - } - } catch (error) { - failed.push(...batch); - } - } - - if (failed.length > 0) { - await this.notificationsSetter.addFailedNotifications( - failed, - notificationKey, - ); + if (success) { + successful.push(...batch); + } else { + failed.push(...batch); } + } - return { successful, failed }; - } catch (error) { - this.logger.error( - `Error sending notifications: ${error.message}`, - 'PushNotificationsService', + if (failed.length > 0) { + await this.notificationsSetter.addFailedNotifications( + failed, + notificationKey, ); - throw error; } + + return { successful, failed }; } async retryFailedNotifications( From 47aa1cae0f4bd1755abacbd10acf48b420cfdac2 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 10:16:42 +0300 Subject: [PATCH 17/22] MEX-748 Moved LockAndRetry decorator --- .../decorators/lock.retry.decorator.ts | 0 .../push-notifications/crons/push.notifications.energy.ts | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename src/{modules/push-notifications => helpers}/decorators/lock.retry.decorator.ts (100%) diff --git a/src/modules/push-notifications/decorators/lock.retry.decorator.ts b/src/helpers/decorators/lock.retry.decorator.ts similarity index 100% rename from src/modules/push-notifications/decorators/lock.retry.decorator.ts rename to src/helpers/decorators/lock.retry.decorator.ts diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 7077eee0e..c2ef72bed 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -13,7 +13,7 @@ import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/servi import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { Logger } from 'winston'; import { RedlockService } from '@multiversx/sdk-nestjs-cache'; -import { LockAndRetry } from '../decorators/lock.retry.decorator'; +import { LockAndRetry } from 'src/helpers/decorators/lock.retry.decorator'; @Injectable() export class PushNotificationsEnergyCron { From 934afbc803e47cbb5f4d90c0b9ac9edd184c3b1f Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 10:30:01 +0300 Subject: [PATCH 18/22] MEX-748 Other small fixes --- src/config/default.json | 3 +-- .../push-notifications/crons/push.notifications.energy.ts | 2 -- .../services/push.notifications.service.ts | 4 ---- src/utils/lock.retry.utils.ts | 7 +++++-- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/config/default.json b/src/config/default.json index 5ecdb372e..b478d53ea 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -655,8 +655,7 @@ "VOLUME_WEIGHT": 0.25, "TRADES_COUNT_WEIGHT": 0.25 }, - "AWS_QUERY_CACHE_WARMER_DELAY": 50, - "TIMESCALEDB_INSERT_CHUNK_SIZE": 30 + "AWS_QUERY_CACHE_WARMER_DELAY": 50 }, "dataApi": { "tableName": "XEXCHANGE_ANALYTICS" diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index c2ef72bed..af7890efd 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -12,7 +12,6 @@ import { pushNotificationsConfig, scAddress } from 'src/config'; import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/services/week-timekeeping.abi.service'; import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { Logger } from 'winston'; -import { RedlockService } from '@multiversx/sdk-nestjs-cache'; import { LockAndRetry } from 'src/helpers/decorators/lock.retry.decorator'; @Injectable() @@ -23,7 +22,6 @@ export class PushNotificationsEnergyCron { private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, - private readonly redLockService: RedlockService, @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts index 3060d53c5..2fcd01261 100644 --- a/src/modules/push-notifications/services/push.notifications.service.ts +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -7,12 +7,8 @@ import { Injectable } from '@nestjs/common'; import { pushNotificationsConfig } from 'src/config'; import { PushNotificationsSetterService } from './push.notifications.setter.service'; import { XPortalApiService } from 'src/services/multiversx-communication/mx.xportal.api.service'; -import { Logger } from '@nestjs/common'; - @Injectable() export class PushNotificationsService { - private readonly logger = new Logger(PushNotificationsService.name); - constructor( private readonly xPortalApiService: XPortalApiService, private readonly notificationsSetter: PushNotificationsSetterService, diff --git a/src/utils/lock.retry.utils.ts b/src/utils/lock.retry.utils.ts index 3bf53f4b3..e1889fd34 100644 --- a/src/utils/lock.retry.utils.ts +++ b/src/utils/lock.retry.utils.ts @@ -1,4 +1,5 @@ import { RedlockService } from '@multiversx/sdk-nestjs-cache'; +import { delay } from 'src/helpers/helpers'; import { Logger } from 'winston'; interface LockRetryOptions { @@ -56,15 +57,17 @@ export async function withLockAndRetry( `Retrying operation ${lockName} (${retryCount}/${maxOperationRetries})`, 'LockRetryUtils', ); - await new Promise((resolve) => setTimeout(resolve, operationRetryInterval)); + await delay(operationRetryInterval); } } } + await redLockService.release(lockKey, lockName); + if (!success) { logger.error( `All retry attempts failed for operation ${lockName}`, 'LockRetryUtils', ); } -} \ No newline at end of file +} From 945d96023c859f39e4d7670b5edb8cdd9455e2e5 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 14:30:58 +0300 Subject: [PATCH 19/22] MEX-748 Place failedNotifications on the common redis instance --- .../crons/push.notifications.energy.ts | 2 ++ .../push.notifications.module.ts | 1 + .../services/push.notifications.setter.service.ts | 14 ++++++++------ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index af7890efd..88e51b652 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -13,6 +13,7 @@ import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/servi import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; import { Logger } from 'winston'; import { LockAndRetry } from 'src/helpers/decorators/lock.retry.decorator'; +import { RedlockService } from '@multiversx/sdk-nestjs-cache'; @Injectable() export class PushNotificationsEnergyCron { @@ -22,6 +23,7 @@ export class PushNotificationsEnergyCron { private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, + private readonly redLockService: RedlockService, @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts index c3be4e386..bf1ef0551 100644 --- a/src/modules/push-notifications/push.notifications.module.ts +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -23,6 +23,7 @@ import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; EnergyModule, WeekTimekeepingModule, DynamicModuleUtils.getRedlockModule(), + DynamicModuleUtils.getCommonRedisModule(), ], providers: [ PushNotificationsService, diff --git a/src/modules/push-notifications/services/push.notifications.setter.service.ts b/src/modules/push-notifications/services/push.notifications.setter.service.ts index f8e32abd1..0f4fe109a 100644 --- a/src/modules/push-notifications/services/push.notifications.setter.service.ts +++ b/src/modules/push-notifications/services/push.notifications.setter.service.ts @@ -1,12 +1,14 @@ import { Injectable } from '@nestjs/common'; -import { CacheService } from 'src/services/caching/cache.service'; +import { RedisCacheService } from '@multiversx/sdk-nestjs-cache'; import { Constants } from '@multiversx/sdk-nestjs-common'; @Injectable() export class PushNotificationsSetterService { private readonly failedNotificationsPrefix = 'pushNotificationsFailed'; - constructor(private readonly cacheService: CacheService) {} + constructor( + private readonly redisCacheService: RedisCacheService, + ) {} async addFailedNotifications( addresses: string[], @@ -16,13 +18,13 @@ export class PushNotificationsSetterService { if (!addresses || addresses.length === 0) return; const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; - await this.cacheService.addToSet(redisKey, addresses); - await this.cacheService.setTtlRemote(redisKey, ttl); + await this.redisCacheService.sadd(redisKey, ...addresses); + await this.redisCacheService.expire(redisKey, ttl); } async getFailedNotifications(notificationKey: string): Promise { const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; - return await this.cacheService.getSetMembers(redisKey); + return await this.redisCacheService.smembers(redisKey); } async removeFailedNotifications( @@ -32,6 +34,6 @@ export class PushNotificationsSetterService { if (!addresses || addresses.length === 0) return; const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; - await this.cacheService.removeFromSet(redisKey, addresses); + await this.redisCacheService.srem(redisKey, ...addresses); } } From dc8e57bad7d292eea4ae36fa13b15fc7d1b3a8e4 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Tue, 6 May 2025 16:02:40 +0300 Subject: [PATCH 20/22] MEX-748 Fix srem issue --- .../services/push.notifications.setter.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/push-notifications/services/push.notifications.setter.service.ts b/src/modules/push-notifications/services/push.notifications.setter.service.ts index 0f4fe109a..f3c61f586 100644 --- a/src/modules/push-notifications/services/push.notifications.setter.service.ts +++ b/src/modules/push-notifications/services/push.notifications.setter.service.ts @@ -34,6 +34,6 @@ export class PushNotificationsSetterService { if (!addresses || addresses.length === 0) return; const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; - await this.redisCacheService.srem(redisKey, ...addresses); + await this.redisCacheService['redis'].srem(redisKey, ...addresses); } } From 9a2131e6c158d8e4356834c45482e3d94ccdb57f Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Wed, 14 May 2025 23:52:57 +0300 Subject: [PATCH 21/22] MEX-748 Integrate devnet energy snapshot index --- .../crons/push.notifications.energy.ts | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index 88e51b652..d41c117ca 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -5,7 +5,6 @@ import { import { Injectable, Inject } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { PushNotificationsService } from '../services/push.notifications.service'; -import { EnergyAbiService } from 'src/modules/energy/services/energy.abi.service'; import { ContextGetterService } from 'src/services/context/context.getter.service'; import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; import { pushNotificationsConfig, scAddress } from 'src/config'; @@ -18,7 +17,6 @@ import { RedlockService } from '@multiversx/sdk-nestjs-cache'; @Injectable() export class PushNotificationsEnergyCron { constructor( - private readonly energyAbiService: EnergyAbiService, private readonly contextGetter: ContextGetterService, private readonly pushNotificationsService: PushNotificationsService, private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, @@ -46,25 +44,6 @@ export class PushNotificationsEnergyCron { let successfulNotifications = 0; let failedNotifications = 0; - const isDevnet = process.env.NODE_ENV === 'devnet'; - - if (isDevnet) { - const addresses = await this.energyAbiService.getUsersWithEnergy(); - - const result = - await this.pushNotificationsService.sendNotificationsInBatches( - addresses, - pushNotificationsConfig[ - NotificationType.FEES_COLLECTOR_REWARDS - ], - NotificationType.FEES_COLLECTOR_REWARDS, - ); - - successfulNotifications += result.successful.length; - failedNotifications += result.failed.length; - return; - } - await this.accountsEnergyElasticService.getAccountsByEnergyAmount( currentEpoch - 1, 'gt', From afcd632a41475c4b0b11d681f3343d3526966c70 Mon Sep 17 00:00:00 2001 From: EmanuelMiron Date: Thu, 15 May 2025 00:06:13 +0300 Subject: [PATCH 22/22] MEX-748 Dynamic Cron based on environment --- .../push-notifications/crons/push.notifications.energy.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts index d41c117ca..794bca7bc 100644 --- a/src/modules/push-notifications/crons/push.notifications.energy.ts +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -25,7 +25,11 @@ export class PushNotificationsEnergyCron { @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, ) {} - @Cron(CronExpression.EVERY_DAY_AT_NOON) + @Cron( + process.env.NODE_ENV === 'mainnet' + ? CronExpression.EVERY_DAY_AT_NOON + : CronExpression.EVERY_4_HOURS, + ) @LockAndRetry({ lockKey: 'pushNotifications', lockName: 'feesCollector',