diff --git a/.env.example b/.env.example index c8e4635a8..8674ee2b5 100644 --- a/.env.example +++ b/.env.example @@ -55,6 +55,7 @@ ENABLE_CACHE_WARMER=true ENABLE_EVENTS_NOTIFIER=false ENABLE_TRACER=false ENABLE_COMPLEXITY=true +ENABLE_PUSH_NOTIFICATIONS=true #Log filename to use for file logging. Optional LOG_FILE= @@ -83,3 +84,7 @@ OPEN_EXCHANGE_RATES_URL=https://openexchangerates.org/api # AWS Timestream AWS_TIMESTREAM_READ=false AWS_TIMESTREAM_WRITE=false + +# Notification Service Configuration +PUSH_NOTIFICATIONS_API_URL=https://devnet-tools.xportal.com +PUSH_NOTIFICATIONS_API_KEY= diff --git a/src/config/default.json b/src/config/default.json index 99c1923c0..ab22a00a4 100644 --- a/src/config/default.json +++ b/src/config/default.json @@ -701,5 +701,23 @@ "nested": 1.5, "exponentialBase": 10 } + }, + "pushNotifications": { + "options": { + "batchSize": 100, + "chainId": 508 + }, + "feesCollectorRewards": { + "title": "xExchange: Energy rewards", + "body": "You can now claim your fees rewards", + "route": "/portfolio", + "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" + }, + "negativeEnergy": { + "title": "xExchange: Update energy", + "body": "You have negative energy", + "route": "/energy", + "iconUrl": "https://xexchange.com/assets/imgs/mx-logos/xexchange-logo@2x.webp" + } } } diff --git a/src/config/index.ts b/src/config/index.ts index b34cadc95..d032baed6 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -38,3 +38,5 @@ export const dataApiConfig = config.get('dataApi'); export const leaguesConfig = config.get('leagues'); export const complexityConfig = config.get('complexity'); + +export const pushNotificationsConfig = config.get('pushNotifications'); diff --git a/src/helpers/api.config.service.ts b/src/helpers/api.config.service.ts index 95aff1e66..378bab848 100644 --- a/src/helpers/api.config.service.ts +++ b/src/helpers/api.config.service.ts @@ -235,6 +235,22 @@ export class ApiConfigService { return mongoDBPassword; } + getNotificationsApiUrl(): string { + const apiUrl = this.configService.get('PUSH_NOTIFICATIONS_API_URL'); + if (!apiUrl) { + throw new Error('No push notifications API url present'); + } + return apiUrl; + } + + getNotificationsApiKey(): string { + const apiKey = this.configService.get('PUSH_NOTIFICATIONS_API_KEY'); + if (!apiKey) { + throw new Error('No push notifications API key present'); + } + return apiKey; + } + getJwtSecret(): string { const secret = this.configService.get('JWT_SECRET'); if (!secret) { @@ -391,4 +407,14 @@ export class ApiConfigService { getRateLimiterSecret(): string | undefined { return this.configService.get('RATE_LIMITER_SECRET'); } + + isNotificationsModuleActive(): boolean { + const notificationsModuleActive = this.configService.get( + 'ENABLE_PUSH_NOTIFICATIONS', + ); + if (!notificationsModuleActive) { + return false; + } + return notificationsModuleActive === 'true'; + } } diff --git a/src/main.ts b/src/main.ts index c21d322c9..174e31798 100644 --- a/src/main.ts +++ b/src/main.ts @@ -13,6 +13,7 @@ import cookieParser from 'cookie-parser'; import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston'; import { LoggerService } from '@nestjs/common'; import { NestExpressApplication } from '@nestjs/platform-express'; +import { PushNotificationsModule } from './modules/push-notifications/push.notifications.module'; async function bootstrap() { BigNumber.config({ EXPONENTIAL_AT: [-30, 30] }); @@ -90,5 +91,10 @@ async function bootstrap() { await rabbitMqService.getFilterAddresses(); await eventsNotifierApp.listen(5673, '0.0.0.0'); } + + if (apiConfigService.isNotificationsModuleActive()) { + const pushNotificationsApp = await NestFactory.create(PushNotificationsModule); + await pushNotificationsApp.listen(5674, '0.0.0.0'); + } } bootstrap(); diff --git a/src/modules/energy/services/energy.abi.service.ts b/src/modules/energy/services/energy.abi.service.ts index 0e97712a8..d55aa8ea4 100644 --- a/src/modules/energy/services/energy.abi.service.ts +++ b/src/modules/energy/services/energy.abi.service.ts @@ -12,13 +12,13 @@ import { MXProxyService } from 'src/services/multiversx-communication/mx.proxy.s import { GenericAbiService } from 'src/services/generics/generic.abi.service'; import { LockOption } from '../models/simple.lock.energy.model'; import { GetOrSetCache } from 'src/helpers/decorators/caching.decorator'; -import { Constants } from '@multiversx/sdk-nestjs-common'; +import { AddressUtils, BinaryUtils } from '@multiversx/sdk-nestjs-common'; import { CacheTtlInfo } from 'src/services/caching/cache.ttl.info'; import { MXApiService } from 'src/services/multiversx-communication/mx.api.service'; import { scAddress } from 'src/config'; import { IEnergyAbiService } from './interfaces'; import { ErrorLoggerAsync } from '@multiversx/sdk-nestjs-common'; - +import { MXGatewayService } from 'src/services/multiversx-communication/mx.gateway.service'; @Injectable() export class EnergyAbiService extends GenericAbiService @@ -27,6 +27,7 @@ export class EnergyAbiService constructor( protected readonly mxProxy: MXProxyService, private readonly mxAPI: MXApiService, + private readonly mxGateway: MXGatewayService, ) { super(mxProxy); } @@ -216,4 +217,27 @@ export class EnergyAbiService return response.firstValue.valueOf(); } + + @ErrorLoggerAsync() + async getUsersWithEnergy(): Promise { + const contractAddress = scAddress.simpleLockEnergy; + + const contractKeysRaw = await this.mxGateway.getSCStorageKeys( + contractAddress, + [], + ); + + const contractPairs = Object.entries(contractKeysRaw); + + const userEnergyKey = BinaryUtils.stringToHex('userEnergy'); + const userEnergyKeys = contractPairs + .filter(([key, _]) => key.startsWith(userEnergyKey)) + .map(([key, _]) => key.replace(userEnergyKey, '')); + + const userEnergyAddresses = userEnergyKeys.map((key) => + AddressUtils.bech32Encode(key), + ); + + return userEnergyAddresses; + } } diff --git a/src/modules/push-notifications/crons/push.notifications.energy.ts b/src/modules/push-notifications/crons/push.notifications.energy.ts new file mode 100644 index 000000000..794bca7bc --- /dev/null +++ b/src/modules/push-notifications/crons/push.notifications.energy.ts @@ -0,0 +1,133 @@ +import { + AccountType, + NotificationType, +} from '../models/push.notifications.types'; +import { Injectable, Inject } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { PushNotificationsService } from '../services/push.notifications.service'; +import { ContextGetterService } from 'src/services/context/context.getter.service'; +import { ElasticAccountsEnergyService } from 'src/services/elastic-search/services/es.accounts.energy.service'; +import { pushNotificationsConfig, scAddress } from 'src/config'; +import { WeekTimekeepingAbiService } from 'src/submodules/week-timekeeping/services/week-timekeeping.abi.service'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; +import { LockAndRetry } from 'src/helpers/decorators/lock.retry.decorator'; +import { RedlockService } from '@multiversx/sdk-nestjs-cache'; + +@Injectable() +export class PushNotificationsEnergyCron { + constructor( + private readonly contextGetter: ContextGetterService, + private readonly pushNotificationsService: PushNotificationsService, + private readonly accountsEnergyElasticService: ElasticAccountsEnergyService, + private readonly weekTimekeepingAbi: WeekTimekeepingAbiService, + private readonly redLockService: RedlockService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, + ) {} + + @Cron( + process.env.NODE_ENV === 'mainnet' + ? CronExpression.EVERY_DAY_AT_NOON + : CronExpression.EVERY_4_HOURS, + ) + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'feesCollector', + }) + async feesCollectorRewardsCron() { + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + const firstWeekStartEpoch = + await this.weekTimekeepingAbi.firstWeekStartEpoch( + String(scAddress.feesCollector), + ); + + if ((currentEpoch - firstWeekStartEpoch) % 7 !== 0) { + return; + } + + let successfulNotifications = 0; + let failedNotifications = 0; + + await this.accountsEnergyElasticService.getAccountsByEnergyAmount( + currentEpoch - 1, + 'gt', + async (items: AccountType[]) => { + const addresses = items.map( + (item: AccountType) => item.address, + ); + + const result = + await this.pushNotificationsService.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[ + NotificationType.FEES_COLLECTOR_REWARDS + ], + NotificationType.FEES_COLLECTOR_REWARDS, + ); + + successfulNotifications += result.successful.length; + failedNotifications += result.failed.length; + }, + ); + + this.logger.log( + `Fees collector rewards cron completed. Successful: ${successfulNotifications}, Failed: ${failedNotifications}`, + 'PushNotificationsEnergyCron', + ); + } + + @Cron('0 12 */2 * *') // Every 2 days at noon + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'negativeEnergy', + }) + async negativeEnergyNotificationsCron() { + const currentEpoch = await this.contextGetter.getCurrentEpoch(); + + let successfulNotifications = 0; + let failedNotifications = 0; + + await this.accountsEnergyElasticService.getAccountsByEnergyAmount( + currentEpoch - 1, + 'lt', + async (items: AccountType[]) => { + const addresses = items.map( + (item: AccountType) => item.address, + ); + + const result = + await this.pushNotificationsService.sendNotificationsInBatches( + addresses, + pushNotificationsConfig[ + NotificationType.NEGATIVE_ENERGY + ], + NotificationType.NEGATIVE_ENERGY, + ); + + successfulNotifications += result.successful.length; + failedNotifications += result.failed.length; + }, + 0, + ); + + this.logger.log( + `Negative energy notifications cron completed. Successful: ${successfulNotifications}, Failed: ${failedNotifications}`, + 'PushNotificationsEnergyCron', + ); + } + + @Cron(CronExpression.EVERY_10_MINUTES) + @LockAndRetry({ + lockKey: 'pushNotifications', + lockName: 'retryFailed', + }) + async retryFailedNotificationsCron() { + const notificationTypes = Object.values(NotificationType); + + for (const notificationType of notificationTypes) { + await this.pushNotificationsService.retryFailedNotifications( + notificationType, + ); + } + } +} diff --git a/src/modules/push-notifications/models/push.notifications.types.ts b/src/modules/push-notifications/models/push.notifications.types.ts new file mode 100644 index 000000000..d92c22b6c --- /dev/null +++ b/src/modules/push-notifications/models/push.notifications.types.ts @@ -0,0 +1,38 @@ +export interface EnergyDetailsType { + lastUpdateEpoch: number; + amount: string; + totalLockedTokens: string; +} + +export interface AccountType { + address: string; + nonce: number; + balance: string; + balanceNum: number; + totalBalanceWithStake: string; + totalBalanceWithStakeNum: number; + timestamp: number; + shardID: number; + totalStake: string; + energy: string; + energyNum: number; + energyDetails: EnergyDetailsType; + totalUnDelegate: string; +} + +export interface NotificationResult { + successful: string[]; + failed: string[]; +} + +export interface NotificationConfig { + title: string; + body: string; + route: string; + iconUrl: string; +} + +export enum NotificationType { + FEES_COLLECTOR_REWARDS = 'feesCollectorRewards', + NEGATIVE_ENERGY = 'negativeEnergy', +} diff --git a/src/modules/push-notifications/push.notifications.module.ts b/src/modules/push-notifications/push.notifications.module.ts new file mode 100644 index 000000000..bf1ef0551 --- /dev/null +++ b/src/modules/push-notifications/push.notifications.module.ts @@ -0,0 +1,35 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { PushNotificationsService } from './services/push.notifications.service'; +import { PushNotificationsSetterService } from './services/push.notifications.setter.service'; +import { CommonAppModule } from '../../common.app.module'; +import { MXCommunicationModule } from '../../services/multiversx-communication/mx.communication.module'; +import { ElasticSearchModule } from '../../services/elastic-search/elastic.search.module'; +import { ContextModule } from '../../services/context/context.module'; +import { CacheModule } from '../../services/caching/cache.module'; +import { EnergyModule } from '../energy/energy.module'; +import { PushNotificationsEnergyCron } from './crons/push.notifications.energy'; +import { WeekTimekeepingModule } from 'src/submodules/week-timekeeping/week-timekeeping.module'; +import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; + +@Module({ + imports: [ + CommonAppModule, + ScheduleModule.forRoot(), + MXCommunicationModule, + ElasticSearchModule, + ContextModule, + CacheModule, + EnergyModule, + WeekTimekeepingModule, + DynamicModuleUtils.getRedlockModule(), + DynamicModuleUtils.getCommonRedisModule(), + ], + providers: [ + PushNotificationsService, + PushNotificationsSetterService, + PushNotificationsEnergyCron, + ], + exports: [], +}) +export class PushNotificationsModule {} diff --git a/src/modules/push-notifications/services/push.notifications.service.ts b/src/modules/push-notifications/services/push.notifications.service.ts new file mode 100644 index 000000000..2fcd01261 --- /dev/null +++ b/src/modules/push-notifications/services/push.notifications.service.ts @@ -0,0 +1,80 @@ +import { + NotificationConfig, + NotificationResult, + NotificationType, +} from '../models/push.notifications.types'; +import { Injectable } from '@nestjs/common'; +import { pushNotificationsConfig } from 'src/config'; +import { PushNotificationsSetterService } from './push.notifications.setter.service'; +import { XPortalApiService } from 'src/services/multiversx-communication/mx.xportal.api.service'; +@Injectable() +export class PushNotificationsService { + constructor( + private readonly xPortalApiService: XPortalApiService, + private readonly notificationsSetter: PushNotificationsSetterService, + ) {} + + async sendNotificationsInBatches( + addresses: string[], + notificationParams: NotificationConfig, + notificationKey: string, + ): Promise { + const batchSize = pushNotificationsConfig.options.batchSize; + const chainId = pushNotificationsConfig.options.chainId; + const failed: string[] = []; + const successful: string[] = []; + + for (let i = 0; i < addresses.length; i += batchSize) { + const batch = addresses.slice(i, i + batchSize); + const success = await this.xPortalApiService.sendPushNotifications({ + addresses: batch, + chainId, + title: notificationParams.title, + body: notificationParams.body, + route: notificationParams.route, + iconUrl: notificationParams.iconUrl, + }); + + if (success) { + successful.push(...batch); + } else { + failed.push(...batch); + } + } + + if (failed.length > 0) { + await this.notificationsSetter.addFailedNotifications( + failed, + notificationKey, + ); + } + + return { successful, failed }; + } + + async retryFailedNotifications( + notificationType: NotificationType, + ): Promise { + const failedAddresses = + await this.notificationsSetter.getFailedNotifications( + notificationType, + ); + + if (!failedAddresses || failedAddresses.length === 0) { + return; + } + + const { successful } = await this.sendNotificationsInBatches( + failedAddresses, + pushNotificationsConfig[notificationType], + notificationType, + ); + + if (successful.length > 0) { + await this.notificationsSetter.removeFailedNotifications( + successful, + notificationType, + ); + } + } +} diff --git a/src/modules/push-notifications/services/push.notifications.setter.service.ts b/src/modules/push-notifications/services/push.notifications.setter.service.ts new file mode 100644 index 000000000..f3c61f586 --- /dev/null +++ b/src/modules/push-notifications/services/push.notifications.setter.service.ts @@ -0,0 +1,39 @@ +import { Injectable } from '@nestjs/common'; +import { RedisCacheService } from '@multiversx/sdk-nestjs-cache'; +import { Constants } from '@multiversx/sdk-nestjs-common'; + +@Injectable() +export class PushNotificationsSetterService { + private readonly failedNotificationsPrefix = 'pushNotificationsFailed'; + + constructor( + private readonly redisCacheService: RedisCacheService, + ) {} + + async addFailedNotifications( + addresses: string[], + notificationKey: string, + ttl: number = Constants.oneWeek(), + ): Promise { + if (!addresses || addresses.length === 0) return; + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; + + await this.redisCacheService.sadd(redisKey, ...addresses); + await this.redisCacheService.expire(redisKey, ttl); + } + + async getFailedNotifications(notificationKey: string): Promise { + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; + return await this.redisCacheService.smembers(redisKey); + } + + async removeFailedNotifications( + addresses: string[], + notificationKey: string, + ): Promise { + if (!addresses || addresses.length === 0) return; + + const redisKey = `${this.failedNotificationsPrefix}.${notificationKey}`; + await this.redisCacheService['redis'].srem(redisKey, ...addresses); + } +} diff --git a/src/services/caching/cache.service.ts b/src/services/caching/cache.service.ts index 703e2289c..c028bda75 100644 --- a/src/services/caching/cache.service.ts +++ b/src/services/caching/cache.service.ts @@ -102,6 +102,20 @@ export class CacheService { return this.redisCacheService.keys(key); } + addToSet(key: string, members: string[]): Promise { + if (!members.length) return; + this.redisCacheService.sadd(key, ...members); + } + + removeFromSet(key: string, members: string[]): Promise { + if (!members.length) return; + return this.redisCacheService['redis'].srem(key, ...members); + } + + getSetMembers(key: string): Promise { + return this.redisCacheService.smembers(key); + } + getOrSetRemote( key: string, createValueFunc: () => Promise, diff --git a/src/services/elastic-search/elastic.search.module.ts b/src/services/elastic-search/elastic.search.module.ts index 887b602d7..64dbc982f 100644 --- a/src/services/elastic-search/elastic.search.module.ts +++ b/src/services/elastic-search/elastic.search.module.ts @@ -3,7 +3,7 @@ import { ESTransactionsService } from './services/es.transactions.service'; import { CommonAppModule } from 'src/common.app.module'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; import { ElasticSearchEventsService } from './services/es.events.service'; -import { ESOperationsService } from './services/es.operations.service'; +import { ElasticAccountsEnergyService } from './services/es.accounts.energy.service'; @Module({ imports: [ @@ -14,12 +14,12 @@ import { ESOperationsService } from './services/es.operations.service'; providers: [ ESTransactionsService, ElasticSearchEventsService, - ESOperationsService, + ElasticAccountsEnergyService, ], exports: [ ESTransactionsService, ElasticSearchEventsService, - ESOperationsService, + ElasticAccountsEnergyService, ], }) export class ElasticSearchModule {} diff --git a/src/services/elastic-search/services/es.accounts.energy.service.ts b/src/services/elastic-search/services/es.accounts.energy.service.ts new file mode 100644 index 000000000..8078c1012 --- /dev/null +++ b/src/services/elastic-search/services/es.accounts.energy.service.ts @@ -0,0 +1,34 @@ +import { QueryType, ElasticService } from '@multiversx/sdk-nestjs-elastic'; + +import { ElasticQuery } from '@multiversx/sdk-nestjs-elastic'; +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class ElasticAccountsEnergyService { + constructor(private readonly elasticService: ElasticService) {} + + async getAccountsByEnergyAmount( + epoch: number, + operator: 'gt' | 'lt' | 'gte' | 'lte' = 'gt', + action: (items: any[]) => Promise, + amount = 0, + ): Promise { + const query = ElasticQuery.create() + .withPagination({ from: 0, size: 10000 }) + .withMustExistCondition('energyDetails'); + + query.condition.must = [ + QueryType.Range('energyDetails.amount', { + key: operator, + value: amount, + }), + ]; + + await this.elasticService.getScrollableList( + `accounts-000001_${epoch}`, + 'address', + query, + action, + ); + } +} diff --git a/src/services/multiversx-communication/mx.communication.module.ts b/src/services/multiversx-communication/mx.communication.module.ts index bc2f34fa6..421381028 100644 --- a/src/services/multiversx-communication/mx.communication.module.ts +++ b/src/services/multiversx-communication/mx.communication.module.ts @@ -4,15 +4,20 @@ import { MXApiService } from './mx.api.service'; import { MXDataApiService } from './mx.data.api.service'; import { MXGatewayService } from './mx.gateway.service'; import { MXProxyService } from './mx.proxy.service'; +import { XPortalApiService } from './mx.xportal.api.service'; import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; @Module({ - imports: [DynamicModuleUtils.getCacheModule()], + imports: [ + DynamicModuleUtils.getCacheModule(), + DynamicModuleUtils.getApiModule(), + ], providers: [ MXProxyService, MXApiService, MXGatewayService, MXDataApiService, + XPortalApiService, ApiConfigService, ], exports: [ @@ -20,6 +25,7 @@ import { DynamicModuleUtils } from 'src/utils/dynamic.module.utils'; MXApiService, MXGatewayService, MXDataApiService, + XPortalApiService, ApiConfigService, ], }) diff --git a/src/services/multiversx-communication/mx.xportal.api.service.ts b/src/services/multiversx-communication/mx.xportal.api.service.ts new file mode 100644 index 000000000..eb6f3c2d2 --- /dev/null +++ b/src/services/multiversx-communication/mx.xportal.api.service.ts @@ -0,0 +1,50 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ApiConfigService } from '../../helpers/api.config.service'; +import { ApiService } from '@multiversx/sdk-nestjs-http'; + +type NotificationPayload = { + addresses: string[]; + chainId: number; + title: string; + body: string; + route?: string; + iconUrl?: string; +}; + +@Injectable() +export class XPortalApiService { + private readonly logger = new Logger(XPortalApiService.name); + + constructor( + private readonly apiConfigService: ApiConfigService, + private readonly apiService: ApiService, + ) {} + + async sendPushNotifications( + notificationPayload: NotificationPayload, + ): Promise { + const baseUrl = this.apiConfigService.getNotificationsApiUrl(); + const apiKey = this.apiConfigService.getNotificationsApiKey(); + const url = `${baseUrl}/notifications-api/api/v1/dapps/push-notifications/send`; + + try { + const response = await this.apiService.post( + url, + notificationPayload, + { + headers: { + 'x-notifications-api-key': apiKey, + }, + }, + ); + + return response.status === 201; + } catch (error) { + this.logger.error( + `Error sending push notification: ${error.message}`, + 'XPortalApiService', + ); + return false; + } + } +}